timeout 方法

Stream<T> timeout(
  1. Duration timeLimit,
  2. {void onTimeout(
    1. EventSink<T> sink
    )?}
)

创建一个新的流,拥有与当前流相同的事件。

当有人监听返回的流,并且超过 timeLimit 没有事件被这个流触发时,会调用 onTimeout 函数,然后可以在返回的流上发出更多事件。

当返回的流开始监听时,开始倒计时;当从这个流发出事件,或者监听返回的流被暂停并恢复时,倒计时会重新启动。当监听返回的流被暂停或取消时,倒计时停止。当倒计时完成并调用 onTimeout 函数时,即使发出事件,也不会启动新的一次倒计时。如果此流的连续事件之间有多个 timeLimit 延迟,则事件之间最多只能发生一次超时。

onTimeout 函数将使用一个参数调用:一个 EventSink,允许向返回的流中添加事件。此 EventSink 仅在调用 onTimeout 时有效。在将 EventSink.close 调用传递到 onTimeout 时,将关闭返回的流,并且不再处理任何事件。

如果省略了 onTimeout,超时会将 TimeoutException 发送至返回流的错误通道。如果调用 onTimeout 抛出异常,错误将作为错误发送到返回流。

如果此流是广播流,则返回的流也是一个广播流。如果多次监听广播流,每个订阅都将有其独立的计时器,该计时器在监听时启动计数,并且可以单独暂停订阅的计时器。

示例

Future<String> waitTask() async {
  return await Future.delayed(
      const Duration(seconds: 4), () => 'Complete');
}
final stream = Stream<String>.fromFuture(waitTask())
    .timeout(const Duration(seconds: 2), onTimeout: (controller) {
  print('TimeOut occurred');
  controller.close();
});

stream.listen(print, onDone: () => print('Done'));

// Outputs:
// TimeOut occurred
// Done

实现

Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) {
  _StreamControllerBase<T> controller;
  if (isBroadcast) {
    controller = new _SyncBroadcastStreamController<T>(null, null);
  } else {
    controller = new _SyncStreamController<T>(null, null, null, null);
  }

  Zone zone = Zone.current;
  // Register callback immediately.
  _TimerCallback timeoutCallback;
  if (onTimeout == null) {
    timeoutCallback = () {
      controller.addError(
          new TimeoutException("No stream event", timeLimit), null);
    };
  } else {
    var registeredOnTimeout =
        zone.registerUnaryCallback<void, EventSink<T>>(onTimeout);
    var wrapper = new _ControllerEventSinkWrapper<T>(null);
    timeoutCallback = () {
      wrapper._sink = controller; // Only valid during call.
      zone.runUnaryGuarded(registeredOnTimeout, wrapper);
      wrapper._sink = null;
    };
  }

  // All further setup happens inside `onListen`.
  controller.onListen = () {
    Timer timer = zone.createTimer(timeLimit, timeoutCallback);
    var subscription = this.listen(null);
    // Set up event forwarding. Each data or error event resets the timer
    subscription
      ..onData((T event) {
        timer.cancel();
        timer = zone.createTimer(timeLimit, timeoutCallback);
        // Controller is synchronous, and the call might close the stream
        // and cancel the timer,
        // so create the Timer before calling into add();
        // issue: https://github.com/dart-lang/sdk/issues/37565
        controller.add(event);
      })
      ..onError((Object error, StackTrace stackTrace) {
        timer.cancel();
        timer = zone.createTimer(timeLimit, timeoutCallback);
        controller._addError(
            error, stackTrace); // Avoid Zone error replacement.
      })
      ..onDone(() {
        timer.cancel();
        controller.close();
      });
    // Set up further controller callbacks.
    controller.onCancel = () {
      timer.cancel();
      return subscription.cancel();
    };
    if (!isBroadcast) {
      controller
        ..onPause = () {
          timer.cancel();
          subscription.pause();
        }
        ..onResume = () {
          subscription.resume();
          timer = zone.createTimer(timeLimit, timeoutCallback);
        };
    }
  };

  return controller.stream;
}