Future
Trait
Future
trait是Rust异步编程的核心。一个Future
就是一个能够产生值的异步计算过程(即便值可以为空,如()
)。下面是一个简化版的future trait:
#![allow(unused)] fn main() { trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } }
可通过调用poll
函数来提前执行future,以让future尽早完成。在future完成后,会返回Poll::Ready(result)
。若future尚未完成,则会返回Poll::Pending
并设置Future
有进展时要调用的wake()
函数。当wake()
被调用时,用于驱动Future
的executor会再次调用poll
以便使Future
能够继续进行。
如果没有wake()
,executor就无从得知是否有某个future取得进展,而不得不对future进行轮询。通过wake()
,executor就能准确得知哪个future准备好被poll
了。
考虑这样的情况,我们需要从一个可能有数据,也可能没有数据的socket中进行读取内容。如果有可用的数据,就可以读取它并返回Poll::Ready(data)
,但若数据尚未准备好,future就会被阻塞,不再有进展。当这种情况发生时,我们就得注册(register)当数据准备好时要调用的wake
,它会告知executor,该future已经准备好继续进行了。一个简单的SocketRead
future如下:
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data-- read it into a buffer and return it.
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data.
//
// Arrange for `wake` to be called once data is available.
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
这种基于Future
的模型使得我们不依赖中间的分配(intermediate allocations)即可组织多个异步的行为。运行多个future或chaining futures可通过免分配状态机实现:
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
// Each field may contain a future that should be run to completion.
// If the future has already completed, the field is set to `None`.
// This prevents us from polling a future after it has completed, which
// would violate the contract of the `Future` trait.
a: Option<FutureA>,
b: Option<FutureB>,
}
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// Attempt to complete future `a`.
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// Attempt to complete future `b`.
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// Both futures have completed-- we can return successfully
Poll::Ready(())
} else {
// One or both futures returned `Poll::Pending` and still have
// work to do. They will call `wake()` when progress can be made.
Poll::Pending
}
}
}
这个例子展示了如何在不分别分配任务的情况下,使多个future共同运行以提高效率。类似地,多个顺序future也可以接连运行,就像这样:
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
// We've completed the first future-- remove it and start on
// the second!
Poll::Ready(()) => self.first.take(),
// We couldn't yet complete the first future.
Poll::Pending => return Poll::Pending,
};
}
// Now that the first future is done, attempt to complete the second.
self.second.poll(wake)
}
}
这些例子展示了Future
trait是如何用于表达异步控制流,而不需借助多个分配对象和多级嵌套回调。解决了基础的控制流问题,现在来说说和实际的Future
trait的不同之处吧。
trait Future {
type Output;
fn poll(
// Note the change from `&mut self` to `Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from `wake: fn()` to `cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
第一处变动是self
的类型不再是&mut Self
,而是变成了Pin<&mut Self>
。我们会在 之后的章节中讨论固定(pinning)的内容,现在只要知道,它能让我们创建不能变动内容(immovable)的future就行了。这种对象能够存储其各字段的指针,比如struct MyFut { a: i32, ptr_to_a: *const i32 }
。固定在async
/.await
的实现中非常有用。
其次,wake: fn()
变成了&mut Context<'_>
。在SimpleFuture
中,我们通过调用一个函数指针(fn()
)来告知future的executor,这个future需要被轮询。但是因为fn()
只是一个函数指针,它不能储存在Future
中被称为wake
的数据。
在现实中,像web服务器这样的复杂应用可能会有上千个不同连接,而这些连接的唤醒状况也需要分别管理。Context
类型通过提供一个Waker
类型值来解决这个问题,这个值能够用于唤醒一个具体的任务(task)。