Future Trait

Futuretrait是Rust异步编程的核心。一个Future就是一个能够产生值的异步计算过程(即便值可以为空,如())。下面是一个简化版的future trait:


#![allow(unused)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

可通过调用poll函数来提前执行future,以让future尽早完成。在future完成后,会返回Poll::Ready(result)。若future尚未完成,则会返回Poll::Pending并设置Future有进展时要调用的wake()函数。当wake()被调用时,用于驱动Future的executor会再次调用poll以便使Future能够继续进行。

如果没有wake(),executor就无从得知是否有某个future取得进展,而不得不对future进行轮询。通过wake(),executor就能准确得知哪个future准备好被poll了。

考虑这样的情况,我们需要从一个可能有数据,也可能没有数据的socket中进行读取内容。如果有可用的数据,就可以读取它并返回Poll::Ready(data),但若数据尚未准备好,future就会被阻塞,不再有进展。当这种情况发生时,我们就得注册(register)当数据准备好时要调用的wake,它会告知executor,该future已经准备好继续进行了。一个简单的SocketReadfuture如下:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

这种基于Future的模型使得我们不依赖中间的分配(intermediate allocations)即可组织多个异步的行为。运行多个future或chaining futures可通过免分配状态机实现:

/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}

这个例子展示了如何在不分别分配任务的情况下,使多个future共同运行以提高效率。类似地,多个顺序future也可以接连运行,就像这样:

/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}

这些例子展示了Futuretrait是如何用于表达异步控制流,而不需借助多个分配对象和多级嵌套回调。解决了基础的控制流问题,现在来说说和实际的Futuretrait的不同之处吧。

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

第一处变动是self的类型不再是&mut Self,而是变成了Pin<&mut Self>。我们会在 之后的章节中讨论固定(pinning)的内容,现在只要知道,它能让我们创建不能变动内容(immovable)的future就行了。这种对象能够存储其各字段的指针,比如struct MyFut { a: i32, ptr_to_a: *const i32 }。固定在async/.await的实现中非常有用。

其次,wake: fn()变成了&mut Context<'_>。在SimpleFuture中,我们通过调用一个函数指针(fn())来告知future的executor,这个future需要被轮询。但是因为fn()只是一个函数指针,它不能储存在Future中被称为wake的数据。

在现实中,像web服务器这样的复杂应用可能会有上千个不同连接,而这些连接的唤醒状况也需要分别管理。Context类型通过提供一个Waker类型值来解决这个问题,这个值能够用于唤醒一个具体的任务(task)。