Stream<T>.multi 构造函数

  1. @Since("2.9")
Stream<T>.multi(
  1. void onListen(
    1. MultiStreamController<T>
    ), {
  2. bool isBroadcast = false,
})

创建一个多订阅流。

每次创建的流被监听时,都会使用一个新创建的 MultiStreamController 来调用 onListen 回调,并将事件转发到该 StreamSubscription

这使得每个监听器都可以被视为一个单独的流。

MultiStreamController 不支持读取其 StreamController.stream。设置其 StreamController.onListen 无效,因为会调用 onListen 回调,而 StreamController.onListen 不会被后续调用。控制器像异步控制器一样工作,但提供了额外的同步传递事件的方法。

如果将 isBroadcast 设置为 true,则返回的流的 Stream.isBroadcast 将为 true。这不会影响流的行为,是否将其作为广播流处理取决于 onListen 函数。

多订阅流可以像其他任何流一样表现。如果 onListen 回调在第一次调用之后抛出异常,则流的行为像一个单订阅流。如果流向所有当前监听器发出相同的事件,则它的行为像一个广播流。

它还可以选择向不同的监听器发出不同的事件。例如,一个流可以重复最近发生的非 null 事件给新监听器,如下所示实现

extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
  Stream<T> repeatLatest() {
    var done = false;
    T? latest = null;
    var currentListeners = <MultiStreamController<T>>{};
    this.listen((event) {
      latest = event;
      for (var listener in [...currentListeners]) listener.addSync(event);
    }, onError: (Object error, StackTrace stack) {
      for (var listener in [...currentListeners]) listener.addErrorSync(error, stack);
    }, onDone: () {
      done = true;
      latest = null;
      for (var listener in currentListeners) listener.closeSync();
      currentListeners.clear();
    });
    return Stream.multi((controller) {
      if (done) {
        controller.close();
        return;
      }
      currentListeners.add(controller);
      var latestValue = latest;
      if (latestValue != null) controller.add(latestValue);
      controller.onCancel = () {
        currentListeners.remove(controller);
      };
    });
  }
}

实现

@Since("2.9")
factory Stream.multi(void Function(MultiStreamController<T>) onListen,
    {bool isBroadcast = false}) {
  return _MultiStream<T>(onListen, isBroadcast);
}