asyncMap<E> 方法

Stream<E> asyncMap<E>(
  1. FutureOr<E> convert(
    1. T event
    )
)

创建一个新的流,其中每个数据事件都会异步映射到新的事件。

这种行为类似于 map,因为在每个数据事件上都会调用一次 convert 函数,但在这里 convert 可能有异步操作并返回一个 Future。如果发生这种情况,这个流将在继续处理其他事件之前等待该 future 完成。

如果这个流是一个广播流,则返回的流也是一个广播流。

实现

Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
  _StreamControllerBase<E> controller;
  if (isBroadcast) {
    controller = _SyncBroadcastStreamController<E>(null, null);
  } else {
    controller = _SyncStreamController<E>(null, null, null, null);
  }

  controller.onListen = () {
    StreamSubscription<T> subscription = this.listen(null,
        onError: controller._addError, // Avoid Zone error replacement.
        onDone: controller.close);
    FutureOr<Null> add(E value) {
      controller.add(value);
    }

    final addError = controller._addError;
    final resume = subscription.resume;
    subscription.onData((T event) {
      FutureOr<E> newValue;
      try {
        newValue = convert(event);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (newValue is Future<E>) {
        subscription.pause();
        newValue.then(add, onError: addError).whenComplete(resume);
      } else {
        controller.add(newValue);
      }
    });
    controller.onCancel = subscription.cancel;
    if (!isBroadcast) {
      controller
        ..onPause = subscription.pause
        ..onResume = resume;
    }
  };
  return controller.stream;
}