通过Waker
唤醒Task
future在第一次被poll
时尚未完成相当常见。此时,future需要确保它在有新进展时能够再次被轮询到,可通过Waker
类型来实现。
每次一个future被轮询时,其实是作为一个"task"的一部分来进行的。task是被提交给executor的各个上层future。
Waker
提供了一个wake()
方法,能够用于告知executor其关联的task需被唤醒。当wake()
被调用时,executor就知道与Waker
关联的task已有进展,而future则会再次被轮询。
Waker
也实现了clone()
,因此它可被复制再储存。
来试试通过Waker
实现一个简单的定时器吧。
应用:创建定时器
由于只是作为例子,我们要做的就是在定时器创建时开辟一条新的线程,使其休眠所需的时间,然后在时间窗口结束时向计时器发出信号。
这是开始时要添加的import:
#![allow(unused)] fn main() { use { std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }, }; }
首先要定义future类型。future需要一个方法来告知线程:定时器时间已到,本future需要被完成。于是用一个共享的Arc<Mutex<..>>
值来使线程和future进行交流。
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
struct SharedState {
/// Whether or not the sleep time has elapsed
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
waker: Option<Waker>,
}
现在来编写Future
的具体实现吧!
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
是不是很简单?当线程设置shared_state.completed = true
时,就完成了!否则,我们就从当前的task把Waker
进行clone,并传递至shared_state.waker
,这样线程就能唤醒task了。
重要的是,每次future被轮询都要对Waker
进行更新,因为future可能已经转移至另一个有不同Waker
的task中了。这可能发生在future在被轮询后,在不同task间传递的过程中。
最后是用于构造计时器和启动线程的API:
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
这就是创建一个简单的计时器future的全过程了。现在,只差一个用于运行future的executor了。