asyncExpand<E> 方法

Stream<E> asyncExpand<E>(
  1. Stream<E>? convert(
    1. T event
    )
)

将每个元素转换为异步事件序列。

返回一个新的流,并对该流中的每个事件执行以下操作

  • 如果事件是错误事件或完成事件,它将直接由返回的流发射。
  • 否则它是一个元素。然后使用元素作为参数调用 convert 函数,以生成该元素的转换流。
  • 如果调用抛出异常,错误将在返回的流中发射。
  • 如果调用返回 null,则不会对元素采取任何进一步的行动。
  • 否则,该流将暂停,并将监听转换流。转换流中的每个数据和错误事件都将按生成顺序在返回的流中发射。当转换流结束时,此流将继续。

如果此流是广播流,则返回的流也是广播流。

实现

Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) {
  _StreamControllerBase<E> controller;
  if (isBroadcast) {
    controller = _SyncBroadcastStreamController<E>(null, null);
  } else {
    controller = _SyncStreamController<E>(null, null, null, null);
  }

  controller.onListen = () {
    StreamSubscription<T> subscription = this.listen(null,
        onError: controller._addError, // Avoid Zone error replacement.
        onDone: controller.close);
    subscription.onData((T event) {
      Stream<E>? newStream;
      try {
        newStream = convert(event);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (newStream != null) {
        subscription.pause();
        controller.addStream(newStream).whenComplete(subscription.resume);
      }
    });
    controller.onCancel = subscription.cancel;
    if (!isBroadcast) {
      controller
        ..onPause = subscription.pause
        ..onResume = subscription.resume;
    }
  };
  return controller.stream;
}