Stream<T>.multi 构造函数

  1. @Since("2.9")
Stream<T>.multi(
  1. void onListen(
    1. MultiStreamController<T>
    ),
  2. {bool isBroadcast = false}
)

创建一个多订阅流。

每次创建的流被订阅时,都会调用 onListen 回调,传递一个新的 MultiStreamController,它将该 StreamSubscription 返回的事件转发。

这允许将每个监听器视为一个单独的流。

MultiStreamController 不支持读取其 StreamController.stream。设置其 StreamController.onListen 没有影响,因为已调用 onListen 回调,并且不会再次调用 StreamController.onListen。控制器表现得像异步控制器,但提供了同步发送事件的方法。

如果将 isBroadcast 设置为 true,则返回的流的 Stream.isBroadcast 将为 true。这对流的操作没有影响,如果 onListen 函数声明自己是广播流,则由该函数来执行广播函数。

多订阅流可以像任何其他流一样操作。如果 onListen 回调在第一次调用后抛出异常,流 behaves like a single-subscription stream. 如果流向所有当前监听器播出相同的事件,它 behaves like a broadcast stream.

它还可以选择向不同监听器发出不同的事件。例如,重复发出最新非 null 事件的流可以实现为以下示例

extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
  Stream<T> repeatLatest() {
    var done = false;
    T? latest = null;
    var currentListeners = <MultiStreamController<T>>{};
    this.listen((event) {
      latest = event;
      for (var listener in [...currentListeners]) listener.addSync(event);
    }, onError: (Object error, StackTrace stack) {
      for (var listener in [...currentListeners]) listener.addErrorSync(error, stack);
    }, onDone: () {
      done = true;
      latest = null;
      for (var listener in currentListeners) listener.closeSync();
      currentListeners.clear();
    });
    return Stream.multi((controller) {
      if (done) {
        controller.close();
        return;
      }
      currentListeners.add(controller);
      var latestValue = latest;
      if (latestValue != null) controller.add(latestValue);
      controller.onCancel = () {
        currentListeners.remove(controller);
      };
    });
  }
}

实现

@Since("2.9")
factory Stream.multi(void Function(MultiStreamController<T>) onListen,
    {bool isBroadcast = false}) {
  return _MultiStream<T>(onListen, isBroadcast);
}