Stream<T>.fromIterable 构造函数
- Iterable<
T> elements
从 elements
中创建一个流。
当流收到监听器时迭代可迭代对象,如果监听器取消订阅,或者当 Iterator.moveNext 方法返回 false
或抛出异常,则停止迭代。在流订阅暂停期间,迭代将被挂起。
如果对 elements.iterator
的 Iterator.moveNext 方法调用抛出异常,则流会发出该错误并随后关闭。如果对 elements.iterator
的 Iterator.current 方法调用抛出异常,则流会发出该错误,但会继续迭代。
可以被监听多次。每个监听器都会独立迭代 elements
。
示例
final numbers = [1, 2, 3, 5, 6, 7];
final stream = Stream.fromIterable(numbers);
实现
factory Stream.fromIterable(Iterable<T> elements) =>
Stream<T>.multi((controller) {
Iterator<T> iterator;
try {
iterator = elements.iterator;
} catch (e, s) {
controller.addError(e, s);
controller.close();
return;
}
var zone = Zone.current;
var isScheduled = true;
void next() {
if (!controller.hasListener || controller.isPaused) {
// Cancelled or paused since scheduled.
isScheduled = false;
return;
}
bool hasNext;
try {
hasNext = iterator.moveNext();
} catch (e, s) {
controller.addErrorSync(e, s);
controller.closeSync();
return;
}
if (hasNext) {
try {
controller.addSync(iterator.current);
} catch (e, s) {
controller.addErrorSync(e, s);
}
if (controller.hasListener && !controller.isPaused) {
zone.scheduleMicrotask(next);
} else {
isScheduled = false;
}
} else {
controller.closeSync();
}
}
controller.onResume = () {
if (!isScheduled) {
isScheduled = true;
zone.scheduleMicrotask(next);
}
};
zone.scheduleMicrotask(next);
});