Stream<T>.fromFutures 构造函数
从一个future组创建一个单一订阅流的构造函数。
该流会按照future完成的顺序报告future的结果。每个future提供数据事件或错误事件,具体取决于future如何完成。
如果在调用Stream.fromFutures
时,某些future已经完成,它们的将按照某种未指定的顺序发出。
当所有future都完成后,流将被关闭。
如果futures
为空,流将在可能的情况下尽快关闭。
示例
Future<int> waitTask() async {
await Future.delayed(const Duration(seconds: 2));
return 10;
}
Future<String> doneTask() async {
await Future.delayed(const Duration(seconds: 5));
return 'Future complete';
}
final stream = Stream<Object>.fromFutures([doneTask(), waitTask()]);
stream.listen(print, onDone: () => print('Done'), onError: print);
// Outputs:
// 10 after 'waitTask' finished.
// "Future complete" after 'doneTask' finished.
// "Done" when stream completed.
实现
factory Stream.fromFutures(Iterable<Future<T>> futures) {
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
int count = 0;
// Declare these as variables holding closures instead of as
// function declarations.
// This avoids creating a new closure from the functions for each future.
void onValue(T value) {
if (!controller.isClosed) {
controller._add(value);
if (--count == 0) controller._closeUnchecked();
}
}
void onError(Object error, StackTrace stack) {
if (!controller.isClosed) {
controller._addError(error, stack);
if (--count == 0) controller._closeUnchecked();
}
}
// The futures are already running, so start listening to them immediately
// (instead of waiting for the stream to be listened on).
// If we wait, we might not catch errors in the futures in time.
for (var future in futures) {
count++;
future.then(onValue, onError: onError);
}
// Use schedule microtask since controller is sync.
if (count == 0) scheduleMicrotask(controller.close);
return controller.stream;
}