Stream<T>.multi 构造函数
- @Since("2.9")
- void onListen( ),
- {bool isBroadcast = false}
创建一个多订阅流。
每次创建的流被订阅时,都会调用 onListen
回调,传递一个新的 MultiStreamController,它将该 StreamSubscription 返回的事件转发。
这允许将每个监听器视为一个单独的流。
MultiStreamController 不支持读取其 StreamController.stream。设置其 StreamController.onListen 没有影响,因为已调用 onListen
回调,并且不会再次调用 StreamController.onListen。控制器表现得像异步控制器,但提供了同步发送事件的方法。
如果将 isBroadcast
设置为 true
,则返回的流的 Stream.isBroadcast 将为 true
。这对流的操作没有影响,如果 onListen
函数声明自己是广播流,则由该函数来执行广播函数。
多订阅流可以像任何其他流一样操作。如果 onListen
回调在第一次调用后抛出异常,流 behaves like a single-subscription stream. 如果流向所有当前监听器播出相同的事件,它 behaves like a broadcast stream.
它还可以选择向不同监听器发出不同的事件。例如,重复发出最新非 null
事件的流可以实现为以下示例
extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
Stream<T> repeatLatest() {
var done = false;
T? latest = null;
var currentListeners = <MultiStreamController<T>>{};
this.listen((event) {
latest = event;
for (var listener in [...currentListeners]) listener.addSync(event);
}, onError: (Object error, StackTrace stack) {
for (var listener in [...currentListeners]) listener.addErrorSync(error, stack);
}, onDone: () {
done = true;
latest = null;
for (var listener in currentListeners) listener.closeSync();
currentListeners.clear();
});
return Stream.multi((controller) {
if (done) {
controller.close();
return;
}
currentListeners.add(controller);
var latestValue = latest;
if (latestValue != null) controller.add(latestValue);
controller.onCancel = () {
currentListeners.remove(controller);
};
});
}
}
实现
@Since("2.9")
factory Stream.multi(void Function(MultiStreamController<T>) onListen,
{bool isBroadcast = false}) {
return _MultiStream<T>(onListen, isBroadcast);
}