Stream<T>.multi 构造函数
- @Since("2.9")
- void onListen(), {
- bool isBroadcast = false,
创建一个多订阅流。
每次创建的流被监听时,都会使用一个新创建的 MultiStreamController 来调用 onListen
回调,并将事件转发到该 StreamSubscription。
这使得每个监听器都可以被视为一个单独的流。
MultiStreamController 不支持读取其 StreamController.stream。设置其 StreamController.onListen 无效,因为会调用 onListen
回调,而 StreamController.onListen 不会被后续调用。控制器像异步控制器一样工作,但提供了额外的同步传递事件的方法。
如果将 isBroadcast
设置为 true
,则返回的流的 Stream.isBroadcast 将为 true
。这不会影响流的行为,是否将其作为广播流处理取决于 onListen
函数。
多订阅流可以像其他任何流一样表现。如果 onListen
回调在第一次调用之后抛出异常,则流的行为像一个单订阅流。如果流向所有当前监听器发出相同的事件,则它的行为像一个广播流。
它还可以选择向不同的监听器发出不同的事件。例如,一个流可以重复最近发生的非 null
事件给新监听器,如下所示实现
extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
Stream<T> repeatLatest() {
var done = false;
T? latest = null;
var currentListeners = <MultiStreamController<T>>{};
this.listen((event) {
latest = event;
for (var listener in [...currentListeners]) listener.addSync(event);
}, onError: (Object error, StackTrace stack) {
for (var listener in [...currentListeners]) listener.addErrorSync(error, stack);
}, onDone: () {
done = true;
latest = null;
for (var listener in currentListeners) listener.closeSync();
currentListeners.clear();
});
return Stream.multi((controller) {
if (done) {
controller.close();
return;
}
currentListeners.add(controller);
var latestValue = latest;
if (latestValue != null) controller.add(latestValue);
controller.onCancel = () {
currentListeners.remove(controller);
};
});
}
}
实现
@Since("2.9")
factory Stream.multi(void Function(MultiStreamController<T>) onListen,
{bool isBroadcast = false}) {
return _MultiStream<T>(onListen, isBroadcast);
}