Stream<T>.fromIterable 构造函数
- Iterable<
T> elements
创建一个从 elements
获取数据的流。
当流接收到监听器时,迭代器会遍历可迭代对象,如果监听器取消了订阅,或者 Iterator.moveNext 方法返回 false
或抛出异常,则停止迭代。在流订阅暂停期间,迭代会被挂起。
如果在 elements.iterator
上调用 Iterator.moveNext 抛出异常,流会发出该错误然后关闭。如果在 elements.iterator
上读取 Iterator.current 抛出异常,流会发出该错误,但会继续迭代。
可以被多次监听。每个监听器都会独立迭代 elements
。
示例
final numbers = [1, 2, 3, 5, 6, 7];
final stream = Stream.fromIterable(numbers);
实现
factory Stream.fromIterable(Iterable<T> elements) =>
Stream<T>.multi((controller) {
Iterator<T> iterator;
try {
iterator = elements.iterator;
} catch (e, s) {
controller.addError(e, s);
controller.close();
return;
}
var zone = Zone.current;
var isScheduled = true;
void next() {
if (!controller.hasListener || controller.isPaused) {
// Cancelled or paused since scheduled.
isScheduled = false;
return;
}
bool hasNext;
try {
hasNext = iterator.moveNext();
} catch (e, s) {
controller.addErrorSync(e, s);
controller.closeSync();
return;
}
if (hasNext) {
try {
controller.addSync(iterator.current);
} catch (e, s) {
controller.addErrorSync(e, s);
}
if (controller.hasListener && !controller.isPaused) {
zone.scheduleMicrotask(next);
} else {
isScheduled = false;
}
} else {
controller.closeSync();
}
}
controller.onResume = () {
if (!isScheduled) {
isScheduled = true;
zone.scheduleMicrotask(next);
}
};
zone.scheduleMicrotask(next);
});