Stream<T>.fromFutures 构造函数
从一组 futures 创建一个单订阅流。
流报告 futures 的结果,顺序与 futures 完成顺序一致。每个 futures 提供一个数据事件或错误事件,具体取决于 futures 的完成方式。
如果调用 Stream.fromFutures
时某些 futures 已经完成,它们的将按某种未指定的顺序发出结果。
当所有 futures 都完成时,流将关闭。
如果 futures
为空,流会尽快关闭。
示例
Future<int> waitTask() async {
await Future.delayed(const Duration(seconds: 2));
return 10;
}
Future<String> doneTask() async {
await Future.delayed(const Duration(seconds: 5));
return 'Future complete';
}
final stream = Stream<Object>.fromFutures([doneTask(), waitTask()]);
stream.listen(print, onDone: () => print('Done'), onError: print);
// Outputs:
// 10 after 'waitTask' finished.
// "Future complete" after 'doneTask' finished.
// "Done" when stream completed.
实现
factory Stream.fromFutures(Iterable<Future<T>> futures) {
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
int count = 0;
// Declare these as variables holding closures instead of as
// function declarations.
// This avoids creating a new closure from the functions for each future.
void onValue(T value) {
if (!controller.isClosed) {
controller._add(value);
if (--count == 0) controller._closeUnchecked();
}
}
void onError(Object error, StackTrace stack) {
if (!controller.isClosed) {
controller._addError(error, stack);
if (--count == 0) controller._closeUnchecked();
}
}
// The futures are already running, so start listening to them immediately
// (instead of waiting for the stream to be listened on).
// If we wait, we might not catch errors in the futures in time.
for (var future in futures) {
count++;
future.then(onValue, onError: onError);
}
// Use schedule microtask since controller is sync.
if (count == 0) scheduleMicrotask(controller.close);
return controller.stream;
}