应用:创建Executor

Rust中的Future是惰性(lazy)的:除非受到主动驱动而被完成,否则不会做任何事。一种驱动方式是在async函数中使用.await,但是这样只是继续向上层抛出了问题:最终是由谁来运行最顶层的async函数返回的future呢?答案就是,我们需要一个Future的executor。

Future的executor会获取一系列的上层Future并将它们运行至完成,是通过在Future有进展时,调用poll的方式来进行的。一般来说,在一个future开始后,executor就会连续对其进行poll。当Future通过调用wake()来表明,它们已经准备好继续进行 时,就会被放回一个队列中,并再次调用poll,如此重复进行直到Future完成。

在这节中,我们要编写一个简单的,负责并发运行大量上层future直到它们完成的executor。

此例子中,我们要用到futurescrate中的ArcWaketrait,这个trait能提供一个更简便的方法来构造一个Waker

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures = "0.3"

接下来,在src/main.rs中添加如下import:

use {
    futures::{
        future::{FutureExt, BoxFuture},
        task::{ArcWake, waker_ref},
    },
    std::{
        future::Future,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};

我们的executor会通过使task在通道(channel)中运行来工作。executor会从通道中取出事件(event)并运行之。当一个task准备好继续工作时(即被唤醒),就可以通过将自身放回通道的方式来进行调度,使自身再次被轮询(poll)。

在这种设计中,executor只需要保存task通道的接收端。用户会获得发送端,这样他们就能创建新的future了。至于task,就是能够重新调度自身的future。因此我们将其以一个搭配了sender的future的形式存储,以使task能够通过sender来重新使自身进入(管道的)队列。

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

还得在(future的)生成器(spawner)中添加一个能让其创建新future的方法。这个方法会接受一个future类型的参数,把它装箱(box it),并创建一个包含装箱结果的Arc<Task>,这样future就可以进入executor的(管道的)队列了。

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

为了对future进行轮询,还需要创建一个Waker。如同在task唤醒那一节中所讨论的,Waker负责在wake被调用时,再次调度task使之被轮询。Waker会告知executor哪一个task已经就绪,并允许它们再次轮询已经准备好继续运行的那些future。最简单的创建Waker的方法就是实现ArcWaketrait再使用waker_ref.into_waker()函数来将Arc<impl ArcWake>转换为Waker。下面为我们的task实现了ArcWake,使之能够转换为Waker和被唤醒:

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}

当从Arc<Task>创建一个Waker时,调用wake()会将一份Arc的拷贝送入task的通道。我们的executor只需要取出它并对其进行轮询。实现如下:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

恭喜!现在我们就有了一个可工作的future的executor。我们甚至可以用它来运行async/.await的代码和自定义的future,比如我们之前编写的TimerFuture

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}