Stream<T>抽象 混入

异步数据事件的来源。

Stream提供了一种接收事件序列的方式。每个事件要么是数据事件,也称为流的元素,要么是错误事件,它是一种通知,表明某些东西失败了。当一个流发出所有事件后,一个单一的“完成”事件会通知监听器已到达末尾。

您通过调用一个async*函数来生成一个流,然后该函数返回一个流。消费该流将导致函数发出事件,直到它结束,然后流关闭。您可以通过在asyncasync*函数内部使用await for循环来消费流,或者通过在async*函数内部直接使用yield*来转发其事件。示例

Stream<T> optionalMap<T>(
    Stream<T> source , [T Function(T)? convert]) async* {
  if (convert == null) {
    yield* source;
  } else {
    await for (var event in source) {
      yield convert(event);
    }
  }
}

当调用此函数时,它立即返回一个Stream<T>对象。然后,直到有人尝试消费该流之前,不会发生任何其他操作。在这一点上,async*函数的主体开始运行。如果省略了convert函数,则yield*将监听source流并将所有事件(数据和错误)转发到返回的流。当source流关闭时,yield*完成,并且optionalMap函数主体也结束。这会关闭返回的流。如果提供了convert,则该函数将监听源流并进入一个await for循环,该循环重复等待下一个数据事件。在数据事件上,它使用值调用convert并在返回的流上发出结果。如果没有从源流发出错误事件,则循环在源流结束时结束,然后optionalMap函数主体完成,这会关闭返回的流。从源流来的错误事件将重新抛出该错误,这会打断循环。由于没有捕获该错误,该错误会达到optionalMap函数主体的末尾,这使得错误被发出在返回的流上,然后流关闭。

Stream类还提供了允许您手动监听流事件或将流转换为另一个流或未来的功能。

forEach函数对应于await for循环,就像Iterable.forEach对应于正常的for/in循环一样。像循环一样,它将为每个数据事件调用一个函数,并在错误时中断。

更底层的listen方法是其他所有方法的基础。您在流上调用listen来告诉它您想接收事件,并注册接收这些事件的回调。当您调用listen时,您会收到一个StreamSubscription对象,该对象是提供事件的活跃对象,您可以使用它来停止监听,或者暂时暂停订阅的事件。

有两种类型的流:“单订阅”流和“广播”流。

单订阅流在整个流的生命周期内只允许一个监听器。它在没有监听器的情况下不会开始生成事件,当监听器取消订阅时,即使事件源还能提供更多事件,它也会停止发送事件。由 async* 函数创建的流是单订阅流,但每次调用该函数都会创建一个新的此类流。

不允许在单个订阅流上重复监听,即使第一次订阅已取消。

单订阅流通常用于流式传输较大的连续数据块,如文件I/O。

广播流允许任何数量的监听器,并且无论是否有监听器,都会在事件准备就绪时触发事件。

广播流用于独立的事件/观察者。

如果多个监听器想监听单个订阅流,请使用 asBroadcastStream 在非广播流上创建广播流。

在任意类型的流上,流转换,如 whereskip,返回与调用方法相同的流类型,除非另有说明。

当事件被触发时,当时在该处的监听器将接收到该事件。如果在事件正在触发时向广播流添加监听器,该监听器将不会接收到当前正在触发的事件。如果监听器被取消,它将立即停止接收事件。在广播流上监听可以被视为在 listen 调用发生时,包含尚未发出的事件的新流上监听。例如,first 获取器监听流,然后返回监听器接收到的第一个事件。这不一定是最先由流发出的事件,而是广播流中 剩余 的第一个事件。

当“完成”事件被触发时,订阅者在接收到事件之前被取消订阅。事件发送后,流已没有订阅者。在此之后向广播流添加新的订阅者是允许的,但他们将尽快接收到一个新的“完成”事件。

流订阅始终尊重“暂停”请求。如果需要,他们需要缓冲其输入,但通常,最好它们也可以请求暂停输入。

isBroadcast 的默认实现返回 false。继承自 Stream 的广播流必须覆盖 isBroadcast 以返回 true,如果它想表示其行为类似于广播流。

实现者
注解
  • @vmIsolateUnsendable

构造函数

Stream()
const
Stream.empty({@Since("3.2") bool broadcast})
创建一个空的广播流。
const
factory
Stream.error(Object error, [StackTrace? stackTrace])
创建一个在完成前只发出一个错误事件的流。
factory
Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<T> sink))
创建一个流,其中现有流的所有事件都通过一个汇流转换。
factory
Stream.fromFuture(Future<T> future)
从未来创建一个新的单订阅流。
factory
Stream.fromFutures(Iterable<Future<T>> futures)
从一组未来创建一个单订阅流。
factory
Stream.fromIterable(Iterable<T> elements)
创建一个从elements获取数据的流。
factory
Stream.multi(void onListen(MultiStreamController<T>), {bool isBroadcast = false})
创建一个多订阅流。
factory
Stream.periodic(Duration period, [T computation(int computationCount)?])
创建一个在period间隔内重复发出事件的流。
factory
Stream.value(T value)
创建一个在关闭前只发出一个数据事件的流。
factory

属性

first Future<T>
此流的第一个元素。
无设置器
hashCode int
此对象的哈希码。
无设置器继承
isBroadcast bool
此流是否为广播流。
无设置器
isEmpty Future<bool>
此流是否包含任何元素。
无设置器
last Future<T>
此流的最后一个元素。
无设置器
length Future<int>
此流中的元素数量。
无设置器
runtimeType Type
对象运行时类型的表示。
无设置器继承
single Future<T>
此流的单个元素。
无设置器

方法

any(bool test(T element)) Future<bool>
检查是否 test 接受此流提供的任何元素。
asBroadcastStream({void onListen(StreamSubscription<T> subscription)?, void onCancel(StreamSubscription<T> subscription)?}) Stream<T>
返回一个可多订阅的流,该流产生与当前流相同的事件。
asyncExpand<E>(Stream<E>? convert(T event)) Stream<E>
将每个元素转换为一组异步事件。
asyncMap<E>(FutureOr<E> convert(T event)) Stream<E>
创建一个新的流,该流的每个数据事件都会异步映射到新的事件。
cast<R>() Stream<R>
将此流调整为 Stream<R>
contains(Object? needle) Future<bool>
返回是否在由该流提供的数据元素中发生 needle
distinct([bool equals(T previous, T next)?]) Stream<T>
如果数据事件与上一个数据事件相等,则跳过数据事件。
drain<E>([E? futureValue]) Future<E>
丢弃此流上的所有数据,但在完成或发生错误时发出信号。
elementAt(int index) Future<T>
返回此流的第 index 个数据事件的值。
every(bool test(T element)) Future<bool>
检查 test 是否接受此流提供的所有元素。
expand<S>(Iterable<S> convert(T element)) Stream<S>
将此流中的每个元素转换为一个元素序列。
firstWhere(bool test(T element), {T orElse()?}) Future<T>
查找与此流中的第一个匹配 test 的元素。
fold<S>(S initialValue, S combine(S previous, T element)) Future<S>
通过反复应用 combine 组合一系列值。
forEach(void action(T element)) Future<void>
对此流中的每个元素执行 action
handleError(Function onError, {bool test(dynamic error)?}) Stream<T>
创建一个包装流,以拦截来自此流的某些错误。
join([String separator = ""]) Future<String>
将元素的字符串表示形式组合成一个单独的字符串。
lastWhere(bool test(T element), {T orElse()?}) Future<T>
查找在此流中匹配 test 的最后一个元素。
listen(void onData(T event)?, {Function? onError, void onDone()?, bool? cancelOnError}) StreamSubscription<T>
为此流添加一个订阅。
map<S>(S convert(T event)) Stream<S>
将此流中的每个元素转换为一个新的流事件。
noSuchMethod(Invocation invocation) → dynamic
当访问不存在的方法或属性时调用。
继承
pipe(StreamConsumer<T> streamConsumer) Future
将此流的每个事件传输到 streamConsumer
reduce(T combine(T previous, T element)) Future<T>
通过反复应用 combine 组合一系列值。
singleWhere(bool test(T element), {T orElse()?}) Future<T>
在此流中查找匹配 test 的单个元素。
skip(int count) Stream<T>
跳过此流中的前 count 个数据事件。
skipWhile(bool test(T element)) Stream<T>
test 匹配的情况下跳过此流中的数据事件。
take(int count) Stream<T>
提供最多此流的前 count 个数据事件。
takeWhile(bool test(T element)) Stream<T>
test 成功时,转发数据事件。
timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) Stream<T>
创建一个新的流,具有与该流相同的事件。
toList() Future<List<T>>
将此流的所有元素收集到一个 List 中。
toSet() Future<Set<T>>
将此流的数据收集到一个 Set 中。
toString() String
此对象的字符串表示。
继承
transform<S>(StreamTransformer<T, S> streamTransformer) Stream<S>
streamTransformer 应用到此流。
where(bool test(T event)) Stream<T>
从当前流创建一个新的流,丢弃一些元素。

操作符

operator ==(Object other) bool
等于操作符。
继承

静态方法

castFrom<S, T>(Stream<S> source) Stream<T>
source 适配为 Stream