Stream<T>.fromFutures 构造函数

Stream<T>.fromFutures(
  1. Iterable<Future<T>> futures
)

从一组 futures 创建一个单订阅流。

流报告 futures 的结果,顺序与 futures 完成顺序一致。每个 futures 提供一个数据事件或错误事件,具体取决于 futures 的完成方式。

如果调用 Stream.fromFutures 时某些 futures 已经完成,它们的将按某种未指定的顺序发出结果。

当所有 futures 都完成时,流将关闭。

如果 futures 为空,流会尽快关闭。

示例

Future<int> waitTask() async {
  await Future.delayed(const Duration(seconds: 2));
  return 10;
}

Future<String> doneTask() async {
  await Future.delayed(const Duration(seconds: 5));
  return 'Future complete';
}

final stream = Stream<Object>.fromFutures([doneTask(), waitTask()]);
stream.listen(print, onDone: () => print('Done'), onError: print);

// Outputs:
// 10 after 'waitTask' finished.
// "Future complete" after 'doneTask' finished.
// "Done" when stream completed.

实现

factory Stream.fromFutures(Iterable<Future<T>> futures) {
  _StreamController<T> controller =
      new _SyncStreamController<T>(null, null, null, null);
  int count = 0;
  // Declare these as variables holding closures instead of as
  // function declarations.
  // This avoids creating a new closure from the functions for each future.
  void onValue(T value) {
    if (!controller.isClosed) {
      controller._add(value);
      if (--count == 0) controller._closeUnchecked();
    }
  }

  void onError(Object error, StackTrace stack) {
    if (!controller.isClosed) {
      controller._addError(error, stack);
      if (--count == 0) controller._closeUnchecked();
    }
  }

  // The futures are already running, so start listening to them immediately
  // (instead of waiting for the stream to be listened on).
  // If we wait, we might not catch errors in the futures in time.
  for (var future in futures) {
    count++;
    future.then(onValue, onError: onError);
  }
  // Use schedule microtask since controller is sync.
  if (count == 0) scheduleMicrotask(controller.close);
  return controller.stream;
}