Stream<T>抽象 混合

异步数据事件的来源。

Stream提供了一个接收一系列事件的途径。每个事件要么是数据事件,也称为流的元素,要么是错误事件,它表示发生了一些错误。当一个流已发出所有事件时,一个单独的“完成”事件通知监听器已达到末尾。

您通过调用一个async*函数来产生一个流,然后该函数返回一个流。消耗该流将导致该函数发出事件,直到它结束,并且流关闭。您可以使用一个await for循环消耗一个流,该循环在asyncasync*函数内部可用,或者通过在async*函数中使用yield*直接将事件转发。示例

Stream<T> optionalMap<T>(
    Stream<T> source , [T Function(T)? convert]) async* {
  if (convert == null) {
    yield* source;
  } else {
    await for (var event in source) {
      yield convert(event);
    }
  }
}

当调用此函数时,它立即返回一个Stream<T>对象。然后不再发生任何事情,直到有人尝试消耗该流。那时,async*函数的主体开始运行。如果省略了convert函数,则yield*将监听source流并将所有事件(数据、错误等)都转发到返回的流中。当source流关闭时,yield*完成,并且optionalMap函数的主体也结束。这关闭了返回的流。如果提供了convert,则该函数将监听源流并进入一个await for循环,循环反复地等待下一个数据事件。在数据事件上,它调用convert并发出结果。如果源流没有发出错误事件,循环将在源流结束时结束,然后optionalMap函数主体完成,这关闭了返回的流。从源流的一个错误事件,await for重新抛出该错误,打破了循环。由于没有捕获,该错误随后到达optionalMap函数主体的末尾。这使得错误被发射到返回的流中,这导致流关闭。

('.')

forEach 函数对应于 await for 循环,正如 Iterable.forEach 对应于正常的 for/in 循环一样。像循环一样,它将为每个数据事件调用一个函数并在错误时中断。

更底层的 listen 方法是所有其他方法的基础。您在流上调用 listen 以告诉它您希望接收事件,并注册将接收这些事件的回调。当您调用 listen 时,您接收一个 StreamSubscription 对象,它是提供事件的活动对象,并且可以用来停止再次监听,或者临时暂停订阅的事件。

有两种类型的流:“单订阅”流和“广播”流。

单订阅流 在整个流的生命周期内只允许一个监听器。它在没有监听器的情况下不会开始生成事件,并且在监听器取消订阅时停止发送事件,即使事件源还能提供更多。由 async* 函数创建的流是一个单订阅流,但是函数的每个调用都将创建一个新的这样的流。

不允许在单订阅流上进行两次监听,即使在第一次订阅被取消之后也不允许。

单订阅流通常用于流式传输大块连续数据,例如文件I/O。

广播流 允许任何数量的听众,并且当它们准备好了,不论有没有听众,都会触发事件。

广播流用于独立事件/观察者。

如果多个听众想要监听一个单订阅流,可以使用 asBroadcastStream 在非广播流上创建一个广播流。

在任何一种流上,流转换,如 whereskip, 返回和被调用的方法相同的流类型,除非有其他说明。

当事件被触发时,当时的那个听众将接收到事件。如果在事件正在触发时向广播流添加了一个听众,那个听众将不会接收到当前正在触发的事件。如果取消了一个听众,它将立即停止接收事件。在广播流上监听可以视为在 listen 调用发生时还没有发射的事件的新流上监听。例如,first 属性监听流,然后返回听众接收到的第一个事件。这不一定是最先由流发射的事件,但是最先的 剩余 事件。

当 "done" 事件被触发时,在接收到事件之前,订阅者会被注销。事件发送后,流上没有订阅者。在此之后向广播流添加新的订阅者是被允许的,但它们将立即收到一个新的 "done" 事件。

流订阅总是尊重 "pause" 请求。如果必要,它们需要缓存它们的输入,但通常最好是它们简单地请求暂停输入。

isBroadcast 方法的默认实现返回 false。一个继承自 Stream 的广播流必须覆盖 isBroadcast 方法,如果它想要指示它像一个广播流那样运行。

实现者
注释
  • @vmIsolateUnsendable

构造函数

Stream()
const
Stream.empty({@Since("3.2") bool broadcast})
创建一个空的广播流。
const
factory
Stream.error(Object error, [StackTrace? stackTrace])
创建一个在完成之前先发射一个错误事件的流。
factory
Stream.eventTransformed(Stream source, EventSink mapSink(EventSink<T> sink))
创建一个将现有流的全部事件通过一个汇流转换的流。
factory
Stream.fromFuture(Future<T> future
从future创建一个新的单订阅流。
factory
Stream.fromFutures(Iterable<Future<T>> futures
从一组future创建一个单订阅流。
factory
Stream.fromIterable(Iterable<T> elements
创建一个从elements获取数据的流。
factory
Stream.multi(void onListen(MultiStreamController<T>), {bool isBroadcast = false})
创建一个多订阅流。
factory
Stream.periodic(Duration period, [T computation(int computationCount)?])
创建一个在period间隔重复发出事件的流。
factory
Stream.value(T value)
创建一个在关闭前只发出单次数据事件的流。
factory

属性

first Future<T>
此流的第一元素。
无设置器
hashCode int
此对象的哈希码。
无设置器继承
isBroadcast bool
此流是否为广播流。
无设置器
isEmpty Future<bool>
此流是否包含任何元素。
无设置器
last Future<T>
此流的最后一个元素。
无设置器
length Future<int>
此流中元素的数量。
无设置器
runtimeType Type
对象运行时类型的表示。
无设置器继承
single Future<T>
此流的单个元素。
无设置器

方法

any(bool test(T element)) Future<bool>
检查 test 是否接受此流提供的任何元素。
asBroadcastStream({void onListen(StreamSubscription<T>)?, void onCancel(StreamSubscription<T>)?}) Stream<T>
返回一个多订阅流,它产生与该流相同的事件。
asyncExpand<E>(Stream<E> convert(T event)) Stream<E>
将每个元素转换为一串异步事件。
asyncMap<E>(FutureOr<E> convert(T event)) Stream<E>
创建一个新流,其中此流的数据事件异步映射到新的事件。
cast<R>() Stream<R>
将此流调整为 Stream<R>
contains(Object? needle) Future<bool>
返回 needle 是否出现在此流提供的元素中。
distinct([bool equals(T previous, T next)?]) Stream<T>
如果在连续的数据事件中相同,则跳过数据事件。
drain<E>([E? futureValue]) Future<E>
丢弃此流上的所有数据,但会在完成后或发生错误时发出信号。
elementAt(int index) Future<T>
返回此流的第 index 个数据事件的值。
every(bool test(T element)) Future<bool>
检查 test 是否接受此流提供的所有元素。
expand<S>(Iterable<S> convert(T element)) Stream<S>
将此流的每个元素转换为元素序列。
firstWhere(bool test(T element), {T orElse()?}) Future<T>
查找与 test 匹配的第一个元素。
fold<S>(S initialValue, S combine(S previous, T element)) Future<S>
通过重复应用 combine 将值序列合并。
forEach(void action(T element)) Future<void>
对此流的每个元素执行 action
handleError(Function onError, {bool test(dynamic error)?}) Stream<T>
创建一个包装器流,拦截此流的某些错误。
join([String separator = ""]) Future<String>
将元素的字面字符串表示形式合并为单个字符串。
lastWhere(bool test(T element), {T orElse()?}) Future<T>
查找此流中匹配 test 的最后一个元素。
listen(void onData(T event)?, {Function? onError, void onDone()?, bool? cancelOnError}) StreamSubscription<T>
为此流添加一个订阅。
map<S>(S convert(T event)) Stream<S>
将此流的每个元素转换为新的事件。
noSuchMethod(Invocation invocation) → dynamic
当访问不存在的方法或属性时被调用。
继承
pipe(StreamConsumer<T> streamConsumer) Future
将此流的每个事件流入 streamConsumer
reduce(T combine(T previous, T element)) Future<T>
通过重复应用 combine 将值序列合并。
singleWhere(bool test(T element), {T orElse()?}) Future<T>
在此流中查找匹配 test 的单一元素。
skip(int count) Stream<T>
跳过来自此流的第一个 count 个数据事件。
skipWhile(bool test(T element)) Stream<T>
当数据事件匹配 test 时跳过。
take(int count) Stream<T>
提供此流的前最多 count 个数据事件。
takeWhile(bool test(T element)) Stream<T>
test 成功时转发数据事件。
timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) Stream<T>
创建一个与该流有相同事件的新流。
toList() Future<List<T>>
收集此流的所有元素到一个 List 中。
toSet() Future<Set<T>>
将此流的数据收集到一个 Set 中。
toString() String
此对象的字符串表示形式。
继承
transform<S>(StreamTransformer<T, S> streamTransformer) Stream<S>
streamTransformer 应用到此流。
where(bool test(T event)) Stream<T>
从本流创建一个新的流,丢弃一些元素。

运算符

operator ==(Object other) bool
等价运算符。
继承

静态方法

castFrom<S, T>(Stream<S> source) Stream<T>
source 转换为 Stream