asyncExpand<E> 方法

Stream<E> asyncExpand<E>(
  1. Stream<E>? convert(
    1. T event
    )
)

将每个元素转换成一系列异步事件。

返回一个新的流,并对该流的每个事件执行以下操作

  • 如果事件是错误事件或完成事件,它将直接由返回的流发出。
  • 否则它是一个元素。然后使用该元素作为参数调用 convert 函数,以生成该元素的转换流。
  • 如果该调用抛出异常,错误将发出在返回的流上。
  • 如果调用返回 null,则不对元素执行任何进一步操作。
  • 否则,该流被暂停,并监听转换流。转换流产生的每个数据和错误事件都按产生的顺序发出在返回的流上。当转换流结束时,此流将恢复。

如果此流是广播流,则返回的流也是广播流。

实现

Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) {
  _StreamControllerBase<E> controller;
  if (isBroadcast) {
    controller = _SyncBroadcastStreamController<E>(null, null);
  } else {
    controller = _SyncStreamController<E>(null, null, null, null);
  }

  controller.onListen = () {
    StreamSubscription<T> subscription = this.listen(null,
        onError: controller._addError, // Avoid Zone error replacement.
        onDone: controller.close);
    subscription.onData((T event) {
      Stream<E>? newStream;
      try {
        newStream = convert(event);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (newStream != null) {
        subscription.pause();
        controller.addStream(newStream).whenComplete(subscription.resume);
      }
    });
    controller.onCancel = subscription.cancel;
    if (!isBroadcast) {
      controller
        ..onPause = subscription.pause
        ..onResume = subscription.resume;
    }
  };
  return controller.stream;
}