StreamTransformer<S, T> 构造函数

const StreamTransformer<S, T>(
  1. StreamSubscription<T> onListen(
    1. Stream<S> stream,
    2. bool cancelOnError
    )
)

根据给定的 onListen 回调创建一个 StreamTransformer

返回的流转换器在监听转换流时使用提供的 onListen 回调。此时,回调将接收到输入流(传递给 bind 的流)和一个布尔标志 cancelOnError 以创建一个 StreamSubscription

如果转换后的流是广播流,那么通过此转换器的 StreamTransformer.bind 方法返回的流也是如此。

如果转换后的流被多次监听,则对于每个新的 Stream.listen 调用,都会再次调用 onListen 回调。这发生在流是广播流还是非广播流的情况下,但对于非广播流,通常调用会失败。

onListen 回调不会接收到传递给 Stream.listen 的处理程序。这些处理程序会在 onListen 回调调用后自动设置(使用 StreamSubscription.onDataStreamSubscription.onErrorStreamSubscription.onDone)。

最常见的情况是,onListen 回调首先在提供的流上调用 Stream.listen(带有相应的 cancelOnError 标志),然后返回一个新的 StreamSubscription

创建 StreamSubscription 有两种常见方式

  1. 通过分配一个 StreamController 并返回监听其流的操作结果。注意,需要转发暂停、恢复和取消事件(除非转换器有意改变此行为)。
  2. 通过创建一个实现 StreamSubscription 的新类。注意,订阅应该在流被监听的 Zone 中运行回调(请参阅 ZoneZone.bindCallback)。

示例

/// Starts listening to [input] and duplicates all non-error events.
StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) {
  // Create the result controller.
  // Using `sync` is correct here, since only async events are forwarded.
  var controller = StreamController<int>(sync: true);
  controller.onListen = () {
    var subscription = input.listen((data) {
      // Duplicate the data.
      controller.add(data);
      controller.add(data);
    },
        onError: controller.addError,
        onDone: controller.close,
        cancelOnError: cancelOnError);
    // Controller forwards pause, resume and cancel events.
    controller
      ..onPause = subscription.pause
      ..onResume = subscription.resume
      ..onCancel = subscription.cancel;
  };
  // Return a new [StreamSubscription] by listening to the controller's
  // stream.
  return controller.stream.listen(null);
}

// Instantiate a transformer:
var duplicator = const StreamTransformer<int, int>(_onListen);

// Use as follows:
intStream.transform(duplicator);

实现

const factory StreamTransformer(
        StreamSubscription<T> onListen(
            Stream<S> stream, bool cancelOnError)) =
    _StreamSubscriptionTransformer<S, T>;