Stream<T>.fromIterable 构造函数

Stream<T>.fromIterable(
  1. Iterable<T> elements
)

创建一个从 elements 获取数据的流。

当流接收到监听器时,迭代器会遍历可迭代对象,如果监听器取消了订阅,或者 Iterator.moveNext 方法返回 false 或抛出异常,则停止迭代。在流订阅暂停期间,迭代会被挂起。

如果在 elements.iterator 上调用 Iterator.moveNext 抛出异常,流会发出该错误然后关闭。如果在 elements.iterator 上读取 Iterator.current 抛出异常,流会发出该错误,但会继续迭代。

可以被多次监听。每个监听器都会独立迭代 elements

示例

final numbers = [1, 2, 3, 5, 6, 7];
final stream = Stream.fromIterable(numbers);

实现

factory Stream.fromIterable(Iterable<T> elements) =>
    Stream<T>.multi((controller) {
      Iterator<T> iterator;
      try {
        iterator = elements.iterator;
      } catch (e, s) {
        controller.addError(e, s);
        controller.close();
        return;
      }
      var zone = Zone.current;
      var isScheduled = true;

      void next() {
        if (!controller.hasListener || controller.isPaused) {
          // Cancelled or paused since scheduled.
          isScheduled = false;
          return;
        }
        bool hasNext;
        try {
          hasNext = iterator.moveNext();
        } catch (e, s) {
          controller.addErrorSync(e, s);
          controller.closeSync();
          return;
        }
        if (hasNext) {
          try {
            controller.addSync(iterator.current);
          } catch (e, s) {
            controller.addErrorSync(e, s);
          }
          if (controller.hasListener && !controller.isPaused) {
            zone.scheduleMicrotask(next);
          } else {
            isScheduled = false;
          }
        } else {
          controller.closeSync();
        }
      }

      controller.onResume = () {
        if (!isScheduled) {
          isScheduled = true;
          zone.scheduleMicrotask(next);
        }
      };

      zone.scheduleMicrotask(next);
    });