通过Waker唤醒Task

future在第一次被poll时尚未完成相当常见。此时,future需要确保它在有新进展时能够再次被轮询到,可通过Waker类型来实现。

每次一个future被轮询时,其实是作为一个"task"的一部分来进行的。task是被提交给executor的各个上层future。

Waker提供了一个wake()方法,能够用于告知executor其关联的task需被唤醒。当wake()被调用时,executor就知道与Waker关联的task已有进展,而future则会再次被轮询。

Waker也实现了clone(),因此它可被复制再储存。

来试试通过Waker实现一个简单的定时器吧。

应用:创建定时器

由于只是作为例子,我们要做的就是在定时器创建时开辟一条新的线程,使其休眠所需的时间,然后在时间窗口结束时向计时器发出信号。

这是开始时要添加的import:


#![allow(unused)]
fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
}

首先要定义future类型。future需要一个方法来告知线程:定时器时间已到,本future需要被完成。于是用一个共享的Arc<Mutex<..>>值来使线程和future进行交流。

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}

现在来编写Future的具体实现吧!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

是不是很简单?当线程设置shared_state.completed = true时,就完成了!否则,我们就从当前的task把Waker进行clone,并传递至shared_state.waker,这样线程就能唤醒task了。

重要的是,每次future被轮询都要对Waker进行更新,因为future可能已经转移至另一个有不同Waker的task中了。这可能发生在future在被轮询后,在不同task间传递的过程中。

最后是用于构造计时器和启动线程的API:

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

这就是创建一个简单的计时器future的全过程了。现在,只差一个用于运行future的executor了。