执行器与系统IO

在前面Future特质小节,我们讨论了一个对socket进行异步读取的future例子:


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // `socket` 有数据的时候将其读取并放置在缓冲区并返回.
            Poll::Ready(self.socket.read_buf())
        } else {
            // `socket` 还没有数据.
            //
            // 当数据来到,将调用 `wake`.
            // 这个 `future` 的调用者将知道何时调用 `poll` 并接收数据.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

这个future会读取socket上的可用数据,如果没有数据可用,那它会让出执行器,请求在socket 再次可读时唤醒它的任务。然而,从这个例子里我们不能很清楚知道Socket类型是如何实现的, 尤其是不了解set_readable_callback函数如何工作。我们要怎样安排lw.wake()方法在一旦 socket变得可用时调用一次呢?一个方案就是让一个线程持续不断地检查socket是否可读,然后 可读时马上调用wake()。然而,这样子效率太低了,那对于每一个阻塞IO的future我们都需要 独立的线程。这会大大降低我们异步代码的效率。

实践时,这个问题是通过整合IO感知系统阻塞元件(IO-aware system blocking primitive), 像Linux上的epoll, FreeBSD和Mac OS的kqueue, Windows的IOCP,以及Fuchsia的poart (以上这些都通过了跨平台Rust库mio暴露出来)。这个元件全都允许线程阻塞多个异步IO事件, 一旦这些事件中有一个完成了,元件就会返回。实际上,这些API通常看着像这样:


# #![allow(unused_variables)]
#fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { ... }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { ... }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);
#}

future执行器能够用这些元件来提供异步IO对象,例如可配置回调在特定IO时间出现时执行的socket。 像上面我们的SocketRead例子,Socket::set_readable_callback函数可能看起来像以下伪代码:


# #![allow(unused_variables)]
#fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
#}

我们现在只需要一个执行器线程来接收并分发任何IO事件给特定Waker,这些Waker会唤醒相应 任务,允许执行器在返回来检查更多IO事件之前,驱动更多任务完成。