异步数据事件的来源。
Stream提供了一种接收事件序列的方式。每个事件要么是数据事件,也称为流的元素,要么是错误事件,它是一种通知,表明某些东西失败了。当一个流发出所有事件后,一个单一的“完成”事件会通知监听器已到达末尾。
您通过调用一个async*
函数来生成一个流,然后该函数返回一个流。消费该流将导致函数发出事件,直到它结束,然后流关闭。您可以通过在async
或async*
函数内部使用await for
循环来消费流,或者通过在async*
函数内部直接使用yield*
来转发其事件。示例
Stream<T> optionalMap<T>(
Stream<T> source , [T Function(T)? convert]) async* {
if (convert == null) {
yield* source;
} else {
await for (var event in source) {
yield convert(event);
}
}
}
当调用此函数时,它立即返回一个Stream<T>
对象。然后,直到有人尝试消费该流之前,不会发生任何其他操作。在这一点上,async*
函数的主体开始运行。如果省略了convert
函数,则yield*
将监听source
流并将所有事件(数据和错误)转发到返回的流。当source
流关闭时,yield*
完成,并且optionalMap
函数主体也结束。这会关闭返回的流。如果提供了convert
,则该函数将监听源流并进入一个await for
循环,该循环重复等待下一个数据事件。在数据事件上,它使用值调用convert
并在返回的流上发出结果。如果没有从源流发出错误事件,则循环在源流结束时结束,然后optionalMap
函数主体完成,这会关闭返回的流。从源流来的错误事件将重新抛出该错误,这会打断循环。由于没有捕获该错误,该错误会达到optionalMap
函数主体的末尾,这使得错误被发出在返回的流上,然后流关闭。
Stream
类还提供了允许您手动监听流事件或将流转换为另一个流或未来的功能。
forEach函数对应于await for
循环,就像Iterable.forEach对应于正常的for
/in
循环一样。像循环一样,它将为每个数据事件调用一个函数,并在错误时中断。
更底层的listen方法是其他所有方法的基础。您在流上调用listen
来告诉它您想接收事件,并注册接收这些事件的回调。当您调用listen
时,您会收到一个StreamSubscription对象,该对象是提供事件的活跃对象,您可以使用它来停止监听,或者暂时暂停订阅的事件。
有两种类型的流:“单订阅”流和“广播”流。
单订阅流在整个流的生命周期内只允许一个监听器。它在没有监听器的情况下不会开始生成事件,当监听器取消订阅时,即使事件源还能提供更多事件,它也会停止发送事件。由 async*
函数创建的流是单订阅流,但每次调用该函数都会创建一个新的此类流。
不允许在单个订阅流上重复监听,即使第一次订阅已取消。
单订阅流通常用于流式传输较大的连续数据块,如文件I/O。
广播流允许任何数量的监听器,并且无论是否有监听器,都会在事件准备就绪时触发事件。
广播流用于独立的事件/观察者。
如果多个监听器想监听单个订阅流,请使用 asBroadcastStream 在非广播流上创建广播流。
在任意类型的流上,流转换,如 where 和 skip,返回与调用方法相同的流类型,除非另有说明。
当事件被触发时,当时在该处的监听器将接收到该事件。如果在事件正在触发时向广播流添加监听器,该监听器将不会接收到当前正在触发的事件。如果监听器被取消,它将立即停止接收事件。在广播流上监听可以被视为在 listen 调用发生时,包含尚未发出的事件的新流上监听。例如,first 获取器监听流,然后返回监听器接收到的第一个事件。这不一定是最先由流发出的事件,而是广播流中 剩余 的第一个事件。
当“完成”事件被触发时,订阅者在接收到事件之前被取消订阅。事件发送后,流已没有订阅者。在此之后向广播流添加新的订阅者是允许的,但他们将尽快接收到一个新的“完成”事件。
流订阅始终尊重“暂停”请求。如果需要,他们需要缓冲其输入,但通常,最好它们也可以请求暂停输入。
isBroadcast 的默认实现返回 false。继承自 Stream 的广播流必须覆盖 isBroadcast 以返回 true
,如果它想表示其行为类似于广播流。
构造函数
- Stream()
-
const
- Stream.empty({@Since("3.2") bool broadcast})
- 创建一个空的广播流。constfactory
- Stream.error(Object error, [StackTrace? stackTrace])
- 创建一个在完成前只发出一个错误事件的流。factory
-
Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<
T> sink)) - 创建一个流,其中现有流的所有事件都通过一个汇流转换。factory
-
Stream.fromFuture(Future<
T> future) - 从未来创建一个新的单订阅流。factory
-
Stream.fromFutures(Iterable<
Future< futures)T> > - 从一组未来创建一个单订阅流。factory
-
Stream.fromIterable(Iterable<
T> elements) - 创建一个从
elements
获取数据的流。factory -
Stream.multi(void onListen(MultiStreamController<
T> ), {bool isBroadcast = false}) - 创建一个多订阅流。factory
- Stream.periodic(Duration period, [T computation(int computationCount)?])
- 创建一个在
period
间隔内重复发出事件的流。factory - Stream.value(T value)
- 创建一个在关闭前只发出一个数据事件的流。factory
属性
方法
-
any(
bool test(T element)) → Future< bool> - 检查是否
test
接受此流提供的任何元素。 -
asBroadcastStream(
{void onListen(StreamSubscription< T> subscription)?, void onCancel(StreamSubscription<T> subscription)?}) → Stream<T> - 返回一个可多订阅的流,该流产生与当前流相同的事件。
-
asyncExpand<
E> (Stream< E> ? convert(T event)) → Stream<E> - 将每个元素转换为一组异步事件。
-
asyncMap<
E> (FutureOr< E> convert(T event)) → Stream<E> - 创建一个新的流,该流的每个数据事件都会异步映射到新的事件。
-
cast<
R> () → Stream< R> - 将此流调整为
Stream<R>
。 -
contains(
Object? needle) → Future< bool> - 返回是否在由该流提供的数据元素中发生
needle
。 -
distinct(
[bool equals(T previous, T next)?]) → Stream< T> - 如果数据事件与上一个数据事件相等,则跳过数据事件。
-
drain<
E> ([E? futureValue]) → Future< E> - 丢弃此流上的所有数据,但在完成或发生错误时发出信号。
-
elementAt(
int index) → Future< T> - 返回此流的第
index
个数据事件的值。 -
every(
bool test(T element)) → Future< bool> - 检查
test
是否接受此流提供的所有元素。 -
expand<
S> (Iterable< S> convert(T element)) → Stream<S> - 将此流中的每个元素转换为一个元素序列。
-
firstWhere(
bool test(T element), {T orElse()?}) → Future< T> - 查找与此流中的第一个匹配
test
的元素。 -
fold<
S> (S initialValue, S combine(S previous, T element)) → Future< S> - 通过反复应用
combine
组合一系列值。 -
forEach(
void action(T element)) → Future< void> - 对此流中的每个元素执行
action
。 -
handleError(
Function onError, {bool test(dynamic error)?}) → Stream< T> - 创建一个包装流,以拦截来自此流的某些错误。
-
join(
[String separator = ""]) → Future< String> - 将元素的字符串表示形式组合成一个单独的字符串。
-
lastWhere(
bool test(T element), {T orElse()?}) → Future< T> - 查找在此流中匹配
test
的最后一个元素。 -
listen(
void onData(T event)?, {Function? onError, void onDone()?, bool? cancelOnError}) → StreamSubscription< T> - 为此流添加一个订阅。
-
map<
S> (S convert(T event)) → Stream< S> - 将此流中的每个元素转换为一个新的流事件。
-
noSuchMethod(
Invocation invocation) → dynamic - 当访问不存在的方法或属性时调用。继承
-
pipe(
StreamConsumer< T> streamConsumer) → Future - 将此流的每个事件传输到
streamConsumer
。 -
reduce(
T combine(T previous, T element)) → Future< T> - 通过反复应用
combine
组合一系列值。 -
singleWhere(
bool test(T element), {T orElse()?}) → Future< T> - 在此流中查找匹配
test
的单个元素。 -
skip(
int count) → Stream< T> - 跳过此流中的前
count
个数据事件。 -
skipWhile(
bool test(T element)) → Stream< T> - 在
test
匹配的情况下跳过此流中的数据事件。 -
take(
int count) → Stream< T> - 提供最多此流的前
count
个数据事件。 -
takeWhile(
bool test(T element)) → Stream< T> - 当
test
成功时,转发数据事件。 -
timeout(
Duration timeLimit, {void onTimeout(EventSink< T> sink)?}) → Stream<T> - 创建一个新的流,具有与该流相同的事件。
-
toList(
) → Future< List< T> > - 将此流的所有元素收集到一个 List 中。
-
toSet(
) → Future< Set< T> > - 将此流的数据收集到一个 Set 中。
-
toString(
) → String - 此对象的字符串表示。继承
-
transform<
S> (StreamTransformer< T, S> streamTransformer) → Stream<S> - 将
streamTransformer
应用到此流。 -
where(
bool test(T event)) → Stream< T> - 从当前流创建一个新的流,丢弃一些元素。
操作符
-
operator ==(
Object other) → bool - 等于操作符。继承