应用:创建Executor
Rust中的Future
是惰性(lazy)的:除非受到主动驱动而被完成,否则不会做任何事。一种驱动方式是在async
函数中使用.await
,但是这样只是继续向上层抛出了问题:最终是由谁来运行最顶层的async
函数返回的future呢?答案就是,我们需要一个Future
的executor。
Future
的executor会获取一系列的上层Future
并将它们运行至完成,是通过在Future
有进展时,调用poll
的方式来进行的。一般来说,在一个future开始后,executor就会连续对其进行poll
。当Future
通过调用wake()
来表明,它们已经准备好继续进行 时,就会被放回一个队列中,并再次调用poll
,如此重复进行直到Future
完成。
在这节中,我们要编写一个简单的,负责并发运行大量上层future直到它们完成的executor。
此例子中,我们要用到futures
crate中的ArcWake
trait,这个trait能提供一个更简便的方法来构造一个Waker
。
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures = "0.3"
接下来,在src/main.rs
中添加如下import:
use {
futures::{
future::{FutureExt, BoxFuture},
task::{ArcWake, waker_ref},
},
std::{
future::Future,
sync::{Arc, Mutex},
sync::mpsc::{sync_channel, SyncSender, Receiver},
task::{Context, Poll},
time::Duration,
},
// The timer we wrote in the previous section:
timer_future::TimerFuture,
};
我们的executor会通过使task在通道(channel)中运行来工作。executor会从通道中取出事件(event)并运行之。当一个task准备好继续工作时(即被唤醒),就可以通过将自身放回通道的方式来进行调度,使自身再次被轮询(poll)。
在这种设计中,executor只需要保存task通道的接收端。用户会获得发送端,这样他们就能创建新的future了。至于task,就是能够重新调度自身的future。因此我们将其以一个搭配了sender的future的形式存储,以使task能够通过sender来重新使自身进入(管道的)队列。
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
/// In-progress future that should be pushed to completion.
///
/// The `Mutex` is not necessary for correctness, since we only have
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread,
/// so we need use the `Mutex` to prove thread-safety. A production
/// executor would not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
还得在(future的)生成器(spawner)中添加一个能让其创建新future的方法。这个方法会接受一个future类型的参数,把它装箱(box it),并创建一个包含装箱结果的Arc<Task>
,这样future就可以进入executor的(管道的)队列了。
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
为了对future进行轮询,还需要创建一个Waker
。如同在task唤醒那一节中所讨论的,Waker
负责在wake
被调用时,再次调度task使之被轮询。Waker
会告知executor哪一个task已经就绪,并允许它们再次轮询已经准备好继续运行的那些future。最简单的创建Waker
的方法就是实现ArcWake
trait再使用waker_ref
或.into_waker()
函数来将Arc<impl ArcWake>
转换为Waker
。下面为我们的task实现了ArcWake
,使之能够转换为Waker
和被唤醒:
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("too many tasks queued");
}
}
当从Arc<Task>
创建一个Waker
时,调用wake()
会将一份Arc
的拷贝送入task的通道。我们的executor只需要取出它并对其进行轮询。实现如下:
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>` is a type alias for
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
if let Poll::Pending = future.as_mut().poll(context) {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
恭喜!现在我们就有了一个可工作的future的executor。我们甚至可以用它来运行async
/.await
的代码和自定义的future,比如我们之前编写的TimerFuture
:
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer.
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run.
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
executor.run();
}