StreamTransformer<S, T> 构造函数

const StreamTransformer<S, T>(
  1. StreamSubscription<T> onListen(
    1. Stream<S> stream,
    2. bool cancelOnError
    )
)

基于给定的 onListen 回调创建一个 StreamTransformer

返回的流转换器在监听转换流时使用提供的 onListen 回调。此时,回调接收输入流(传递给 bind 的那个)和一个布尔标志 cancelOnError 来创建一个 StreamSubscription

如果转换流是一个广播流,那么通过此转换器的 StreamTransformer.bind 方法返回的流也是广播流。

如果转换流被多次监听,则对于每个新的 Stream.listen 调用,都会再次调用 onListen 回调。这既适用于广播流,也适用于非广播流,但调用通常会失败。

onListen 回调不接收传递给 Stream.listen 的处理程序。这些处理程序在调用 onListen 回调后自动设置(使用 StreamSubscription.onDataStreamSubscription.onErrorStreamSubscription.onDone)。

最常见的是,onListen 回调首先在提供的流上调用 Stream.listen(带有相应的 cancelOnError 标志),然后返回一个新 StreamSubscription

创建 StreamSubscription 有两种常见方式

  1. 通过分配一个 StreamController 并返回其流的监听结果。重要的是要转发暂停、恢复和取消事件(除非转换器故意想要改变这种行为)。
  2. 通过创建一个新的实现了 StreamSubscription 的类。注意,订阅应该在流被监听到的 Zone 中运行回调(参见 ZoneZone.bindCallback))。

示例

/// Starts listening to [input] and duplicates all non-error events.
StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) {
  // Create the result controller.
  // Using `sync` is correct here, since only async events are forwarded.
  var controller = StreamController<int>(sync: true);
  controller.onListen = () {
    var subscription = input.listen((data) {
      // Duplicate the data.
      controller.add(data);
      controller.add(data);
    },
        onError: controller.addError,
        onDone: controller.close,
        cancelOnError: cancelOnError);
    // Controller forwards pause, resume and cancel events.
    controller
      ..onPause = subscription.pause
      ..onResume = subscription.resume
      ..onCancel = subscription.cancel;
  };
  // Return a new [StreamSubscription] by listening to the controller's
  // stream.
  return controller.stream.listen(null);
}

// Instantiate a transformer:
var duplicator = const StreamTransformer<int, int>(_onListen);

// Use as follows:
intStream.transform(duplicator);

实现

const factory StreamTransformer(
        StreamSubscription<T> onListen(
            Stream<S> stream, bool cancelOnError)) =
    _StreamSubscriptionTransformer<S, T>;