Stream<T>.fromFutures 构造函数

Stream<T>.fromFutures(
  1. Iterable<Future<T>> futures
)

从一个future组创建一个单一订阅流的构造函数。

该流会按照future完成的顺序报告future的结果。每个future提供数据事件或错误事件,具体取决于future如何完成。

如果在调用Stream.fromFutures时,某些future已经完成,它们的将按照某种未指定的顺序发出。

当所有future都完成后,流将被关闭。

如果futures为空,流将在可能的情况下尽快关闭。

示例

Future<int> waitTask() async {
  await Future.delayed(const Duration(seconds: 2));
  return 10;
}

Future<String> doneTask() async {
  await Future.delayed(const Duration(seconds: 5));
  return 'Future complete';
}

final stream = Stream<Object>.fromFutures([doneTask(), waitTask()]);
stream.listen(print, onDone: () => print('Done'), onError: print);

// Outputs:
// 10 after 'waitTask' finished.
// "Future complete" after 'doneTask' finished.
// "Done" when stream completed.

实现

factory Stream.fromFutures(Iterable<Future<T>> futures) {
  _StreamController<T> controller =
      new _SyncStreamController<T>(null, null, null, null);
  int count = 0;
  // Declare these as variables holding closures instead of as
  // function declarations.
  // This avoids creating a new closure from the functions for each future.
  void onValue(T value) {
    if (!controller.isClosed) {
      controller._add(value);
      if (--count == 0) controller._closeUnchecked();
    }
  }

  void onError(Object error, StackTrace stack) {
    if (!controller.isClosed) {
      controller._addError(error, stack);
      if (--count == 0) controller._closeUnchecked();
    }
  }

  // The futures are already running, so start listening to them immediately
  // (instead of waiting for the stream to be listened on).
  // If we wait, we might not catch errors in the futures in time.
  for (var future in futures) {
    count++;
    future.then(onValue, onError: onError);
  }
  // Use schedule microtask since controller is sync.
  if (count == 0) scheduleMicrotask(controller.close);
  return controller.stream;
}