timeout 方法
创建一个新的流,它具有与当前流相同的的事件。
当有人监听返回的流,且超过 timeLimit
的时间没有从当前流中发出任何事件时,会调用 onTimeout
函数,然后可以在此返回的流上进一步发出事件。
倒计时从返回的流被监听时开始,当从当前流中发出事件,或者当监听返回的流被暂停和恢复时重新启动。当监听返回的流被暂停或取消时停止倒计时。当倒计时完成并调用 onTimeout
函数时,即使发出事件,也不会开始新的倒计时。如果此流的两个事件之间的延迟是多个 timeLimit
,则事件之间最多发生一次超时。
onTimeout
函数会带有一个参数:一个允许将事件放入返回流的 EventSink。此 EventSink
只在 onTimeout
调用期间有效。在 onTimeout
传递给 EventSink.close 的 sink 上调用关闭,将关闭返回的流,并且不会处理进一步的事件。
如果省略了 onTimeout
,超时会将一个 TimeoutException 发送到返回流的错误通道。如果 onTimeout
的调用抛出异常,错误会作为返回流的错误发出。
如果此流是广播流,则返回的流也是广播流。如果广播流被多次监听,每个订阅将有自己的计时器,该计时器在监听时开始计时,订阅的计时器可以单独暂停。
示例
Future<String> waitTask() async {
return await Future.delayed(
const Duration(seconds: 4), () => 'Complete');
}
final stream = Stream<String>.fromFuture(waitTask())
.timeout(const Duration(seconds: 2), onTimeout: (controller) {
print('TimeOut occurred');
controller.close();
});
stream.listen(print, onDone: () => print('Done'));
// Outputs:
// TimeOut occurred
// Done
实现
Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) {
_StreamControllerBase<T> controller;
if (isBroadcast) {
controller = new _SyncBroadcastStreamController<T>(null, null);
} else {
controller = new _SyncStreamController<T>(null, null, null, null);
}
Zone zone = Zone.current;
// Register callback immediately.
_TimerCallback timeoutCallback;
if (onTimeout == null) {
timeoutCallback = () {
controller.addError(
new TimeoutException("No stream event", timeLimit), null);
};
} else {
var registeredOnTimeout =
zone.registerUnaryCallback<void, EventSink<T>>(onTimeout);
var wrapper = new _ControllerEventSinkWrapper<T>(null);
timeoutCallback = () {
wrapper._sink = controller; // Only valid during call.
zone.runUnaryGuarded(registeredOnTimeout, wrapper);
wrapper._sink = null;
};
}
// All further setup happens inside `onListen`.
controller.onListen = () {
Timer timer = zone.createTimer(timeLimit, timeoutCallback);
var subscription = this.listen(null);
// Set up event forwarding. Each data or error event resets the timer
subscription
..onData((T event) {
timer.cancel();
timer = zone.createTimer(timeLimit, timeoutCallback);
// Controller is synchronous, and the call might close the stream
// and cancel the timer,
// so create the Timer before calling into add();
// issue: https://github.com/dart-lang/sdk/issues/37565
controller.add(event);
})
..onError((Object error, StackTrace stackTrace) {
timer.cancel();
timer = zone.createTimer(timeLimit, timeoutCallback);
controller._addError(
error, stackTrace); // Avoid Zone error replacement.
})
..onDone(() {
timer.cancel();
controller.close();
});
// Set up further controller callbacks.
controller.onCancel = () {
timer.cancel();
return subscription.cancel();
};
if (!isBroadcast) {
controller
..onPause = () {
timer.cancel();
subscription.pause();
}
..onResume = () {
subscription.resume();
timer = zone.createTimer(timeLimit, timeoutCallback);
};
}
};
return controller.stream;
}