StreamTransformer<S, T> 构造函数
- StreamSubscription<
T> onListen()
根据给定的 onListen
回调创建一个 StreamTransformer。
返回的流转换器在监听转换流时使用提供的 onListen
回调。此时,回调将接收到输入流(传递给 bind 的流)和一个布尔标志 cancelOnError
以创建一个 StreamSubscription。
如果转换后的流是广播流,那么通过此转换器的 StreamTransformer.bind 方法返回的流也是如此。
如果转换后的流被多次监听,则对于每个新的 Stream.listen 调用,都会再次调用 onListen
回调。这发生在流是广播流还是非广播流的情况下,但对于非广播流,通常调用会失败。
onListen
回调不会接收到传递给 Stream.listen 的处理程序。这些处理程序会在 onListen
回调调用后自动设置(使用 StreamSubscription.onData、StreamSubscription.onError 和 StreamSubscription.onDone)。
最常见的情况是,onListen
回调首先在提供的流上调用 Stream.listen(带有相应的 cancelOnError
标志),然后返回一个新的 StreamSubscription。
创建 StreamSubscription 有两种常见方式
- 通过分配一个 StreamController 并返回监听其流的操作结果。注意,需要转发暂停、恢复和取消事件(除非转换器有意改变此行为)。
- 通过创建一个实现 StreamSubscription 的新类。注意,订阅应该在流被监听的 Zone 中运行回调(请参阅 Zone 和 Zone.bindCallback)。
示例
/// Starts listening to [input] and duplicates all non-error events.
StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) {
// Create the result controller.
// Using `sync` is correct here, since only async events are forwarded.
var controller = StreamController<int>(sync: true);
controller.onListen = () {
var subscription = input.listen((data) {
// Duplicate the data.
controller.add(data);
controller.add(data);
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
// Controller forwards pause, resume and cancel events.
controller
..onPause = subscription.pause
..onResume = subscription.resume
..onCancel = subscription.cancel;
};
// Return a new [StreamSubscription] by listening to the controller's
// stream.
return controller.stream.listen(null);
}
// Instantiate a transformer:
var duplicator = const StreamTransformer<int, int>(_onListen);
// Use as follows:
intStream.transform(duplicator);
实现
const factory StreamTransformer(
StreamSubscription<T> onListen(
Stream<S> stream, bool cancelOnError)) =
_StreamSubscriptionTransformer<S, T>;