Executor和系统IO

在之前的Futuretrait一节中,我们讨论了这个从socket异步读取的future例子:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

这个future会从socket中读取可用数据,如果没有可用数据,就让出到executor,并请求在socket可读时再被唤醒。但是通过这个例子还是没有说清楚Socket类型是如何实现的,特别是set_readable_callback这个函数的工作方式。现在的问题是,如何令wake()在socket可用时立即被调用?一个选项是通过一条线程不停检查socket是否可读,并在合适的时候调用wake(),不过这么干效率太低了,因为每一个被阻塞的IO future都得有一条单独的线程。

实际上,这个问题是通过内置的IO感知系统阻塞原语(IO-aware system blocking primitive)来解决的,比如Linux的epoll,FreeBSD/Mac OS/IOCP on Windows的kqueue以及Fuchsia的port(这些都在Rust的跨平台cratemio里得以暴露)。这些原语都允许一个线程在多个异步IO事件中阻塞,当其中一个事件完成了就返回。API看上去是这样的:

struct IoBlocker {
    /* ... */
}

struct Event {
    // 一个用于识别和监听特定事件的ID
    id: usize,

    // 一个需等待或已完成signal的集合
    signals: Signals,
}

impl IoBlocker {
    /// 创建一系列要阻塞的异步IO事件
    fn new() -> Self { /* ... */ }

    /// 表示对特定IO事件的关注
    fn add_io_event_interest(
        &self,

        /// 事件发生时依附的对象
        io_object: &IoObject,
		/// 为驱动事件,需要在 `io_object`中设置一系列signal
		/// 并与给出的ID搭配工作
        event: Event,
    ) { /* ... */ }

    /// 阻塞并等待事件发生
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// 在某socket可用时输出"Socket 1 is now READABLE"的字样
println!("Socket {:?} is now {:?}", event.id, event.signals);

Future的executor能够利用这些原语来提供异步IO对象,比如能够为socket配置当特定IO事件发生时,要运行的回调函数。在上面SocketRead的例子中,Socket::set_readable_callback的伪代码如下:

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
    	// `local_executor`是一个对本地executor的引用,能用于socket的创建
    	// 但在实际应用中,为了方便起见,在很多executor实现中
    	// 都会将其传递至线程的本地存储
        let local_executor = self.local_executor;

        // 为IO对象设置唯一ID
        let id = self.id;
        
		// 在executor的map中存储本地waker
		// 以使其能在IO事件到来时被调用
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

现在就有一个能和Waker收发IO事件的executor线程了,能唤醒响应task,让executor能在返回处理其他IO事件(继续循环)前,驱动更多的task完成。