异步数据事件的来源。
Stream提供了一个接收一系列事件的途径。每个事件要么是数据事件,也称为流的元素,要么是错误事件,它表示发生了一些错误。当一个流已发出所有事件时,一个单独的“完成”事件通知监听器已达到末尾。
您通过调用一个async*
函数来产生一个流,然后该函数返回一个流。消耗该流将导致该函数发出事件,直到它结束,并且流关闭。您可以使用一个await for
循环消耗一个流,该循环在async
或async*
函数内部可用,或者通过在async*
函数中使用yield*
直接将事件转发。示例
Stream<T> optionalMap<T>(
Stream<T> source , [T Function(T)? convert]) async* {
if (convert == null) {
yield* source;
} else {
await for (var event in source) {
yield convert(event);
}
}
}
当调用此函数时,它立即返回一个Stream<T>
对象。然后不再发生任何事情,直到有人尝试消耗该流。那时,async*
函数的主体开始运行。如果省略了convert
函数,则yield*
将监听source
流并将所有事件(数据、错误等)都转发到返回的流中。当source
流关闭时,yield*
完成,并且optionalMap
函数的主体也结束。这关闭了返回的流。如果提供了convert
,则该函数将监听源流并进入一个await for
循环,循环反复地等待下一个数据事件。在数据事件上,它调用convert
并发出结果。如果源流没有发出错误事件,循环将在源流结束时结束,然后optionalMap
函数主体完成,这关闭了返回的流。从源流的一个错误事件,await for
重新抛出该错误,打破了循环。由于没有捕获,该错误随后到达optionalMap
函数主体的末尾。这使得错误被发射到返回的流中,这导致流关闭。
('.')
forEach 函数对应于 await for
循环,正如 Iterable.forEach 对应于正常的 for
/in
循环一样。像循环一样,它将为每个数据事件调用一个函数并在错误时中断。
更底层的 listen 方法是所有其他方法的基础。您在流上调用 listen
以告诉它您希望接收事件,并注册将接收这些事件的回调。当您调用 listen
时,您接收一个 StreamSubscription 对象,它是提供事件的活动对象,并且可以用来停止再次监听,或者临时暂停订阅的事件。
有两种类型的流:“单订阅”流和“广播”流。
单订阅流 在整个流的生命周期内只允许一个监听器。它在没有监听器的情况下不会开始生成事件,并且在监听器取消订阅时停止发送事件,即使事件源还能提供更多。由 async*
函数创建的流是一个单订阅流,但是函数的每个调用都将创建一个新的这样的流。
不允许在单订阅流上进行两次监听,即使在第一次订阅被取消之后也不允许。
单订阅流通常用于流式传输大块连续数据,例如文件I/O。
广播流 允许任何数量的听众,并且当它们准备好了,不论有没有听众,都会触发事件。
广播流用于独立事件/观察者。
如果多个听众想要监听一个单订阅流,可以使用 asBroadcastStream 在非广播流上创建一个广播流。
在任何一种流上,流转换,如 where 和 skip, 返回和被调用的方法相同的流类型,除非有其他说明。
当事件被触发时,当时的那个听众将接收到事件。如果在事件正在触发时向广播流添加了一个听众,那个听众将不会接收到当前正在触发的事件。如果取消了一个听众,它将立即停止接收事件。在广播流上监听可以视为在 listen 调用发生时还没有发射的事件的新流上监听。例如,first 属性监听流,然后返回听众接收到的第一个事件。这不一定是最先由流发射的事件,但是最先的 剩余 事件。
当 "done" 事件被触发时,在接收到事件之前,订阅者会被注销。事件发送后,流上没有订阅者。在此之后向广播流添加新的订阅者是被允许的,但它们将立即收到一个新的 "done" 事件。
流订阅总是尊重 "pause" 请求。如果必要,它们需要缓存它们的输入,但通常最好是它们简单地请求暂停输入。
isBroadcast 方法的默认实现返回 false。一个继承自 Stream 的广播流必须覆盖 isBroadcast 方法,如果它想要指示它像一个广播流那样运行。
构造函数
- Stream()
-
const
- Stream.empty({@Since("3.2") bool broadcast})
- 创建一个空的广播流。constfactory
- Stream.error(Object error, [StackTrace? stackTrace])
- 创建一个在完成之前先发射一个错误事件的流。factory
-
Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<
T> sink)) - 创建一个将现有流的全部事件通过一个汇流转换的流。factory
-
Stream.fromFuture(Future<
T> future - 从future创建一个新的单订阅流。factory
-
Stream.fromFutures(Iterable<
Future< futuresT> > - 从一组future创建一个单订阅流。factory
-
Stream.fromIterable(Iterable<
T> elements - 创建一个从
elements
获取数据的流。factory -
Stream.multi(void onListen(MultiStreamController<
T> ), {bool isBroadcast = false}) - 创建一个多订阅流。factory
- Stream.periodic(Duration period, [T computation(int computationCount)?])
- 创建一个在
period
间隔重复发出事件的流。factory - Stream.value(T value)
- 创建一个在关闭前只发出单次数据事件的流。factory
属性
方法
-
any(
bool test(T element)) → Future<bool> - 检查
test
是否接受此流提供的任何元素。 -
asBroadcastStream(
{void onListen(StreamSubscription<T>)?, void onCancel(StreamSubscription<T>)?}) → Stream<T> - 返回一个多订阅流,它产生与该流相同的事件。
-
asyncExpand<E>(
Stream<E> convert(T event)) → Stream<E> - 将每个元素转换为一串异步事件。
-
asyncMap<E>(
FutureOr<E> convert(T event)) → Stream<E> - 创建一个新流,其中此流的数据事件异步映射到新的事件。
-
cast<R>(
) → Stream<R> - 将此流调整为
Stream<R>
。 -
contains(
Object? needle) → Future<bool> - 返回
needle
是否出现在此流提供的元素中。 -
distinct(
[bool equals(T previous, T next)?]) → Stream<T> - 如果在连续的数据事件中相同,则跳过数据事件。
-
drain<
E> ([E? futureValue]) → Future< E> - 丢弃此流上的所有数据,但会在完成后或发生错误时发出信号。
-
elementAt(
int index) → Future< T> - 返回此流的第
index
个数据事件的值。 -
every(
bool test(T element)) → Future< bool> - 检查
test
是否接受此流提供的所有元素。 -
expand<
S> (Iterable< S> convert(T element)) → Stream<S> - 将此流的每个元素转换为元素序列。
-
firstWhere(
bool test(T element), {T orElse()?}) → Future< T> - 查找与
test
匹配的第一个元素。 -
fold<
S> (S initialValue, S combine(S previous, T element)) → Future< S> - 通过重复应用
combine
将值序列合并。 -
forEach(
void action(T element)) → Future< void> - 对此流的每个元素执行
action
。 -
handleError(
Function onError, {bool test(dynamic error)?}) → Stream< T> - 创建一个包装器流,拦截此流的某些错误。
-
join(
[String separator = ""]) → Future< String> - 将元素的字面字符串表示形式合并为单个字符串。
-
lastWhere(
bool test(T element), {T orElse()?}) → Future< T> - 查找此流中匹配
test
的最后一个元素。 -
listen(
void onData(T event)?, {Function? onError, void onDone()?, bool? cancelOnError}) → StreamSubscription< T> - 为此流添加一个订阅。
-
map<
S> (S convert(T event)) → Stream< S> - 将此流的每个元素转换为新的事件。
-
noSuchMethod(
Invocation invocation) → dynamic - 当访问不存在的方法或属性时被调用。继承
-
pipe(
StreamConsumer< T> streamConsumer) → Future - 将此流的每个事件流入
streamConsumer
。 -
reduce(
T combine(T previous, T element)) → Future< T> - 通过重复应用
combine
将值序列合并。 -
singleWhere(
bool test(T element), {T orElse()?}) → Future< T> - 在此流中查找匹配
test
的单一元素。 -
skip(
int count) → Stream< T> - 跳过来自此流的第一个
count
个数据事件。 -
skipWhile(
bool test(T element)) → Stream< T> - 当数据事件匹配
test
时跳过。 -
take(
int count) → Stream< T> - 提供此流的前最多
count
个数据事件。 -
takeWhile(
bool test(T element)) → Stream< T> - 在
test
成功时转发数据事件。 -
timeout(
Duration timeLimit, {void onTimeout(EventSink< T> sink)?}) → Stream<T> - 创建一个与该流有相同事件的新流。
-
toList(
) → Future< List< T> > - 收集此流的所有元素到一个
List
中。 -
toSet(
) → Future< Set< T> > - 将此流的数据收集到一个
Set
中。 -
toString(
) → String - 此对象的字符串表示形式。继承
-
transform<
S> (StreamTransformer< T, S> streamTransformer) → Stream<S> - 将
streamTransformer
应用到此流。 -
where(
bool test(T event)) → Stream< T> - 从本流创建一个新的流,丢弃一些元素。
运算符
-
operator ==(
Object other) → bool - 等价运算符。继承