执行器与系统IO
在前面 Future
trait小节,我们讨论了一个对 socket 进行异步读取的 future 例子:
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// `socket` 有数据的时候将其读取并放置在缓冲区并返回.
Poll::Ready(self.socket.read_buf())
} else {
// `socket` 还没有数据.
//
// 当数据来到,将调用 `wake`.
// 这个 `future` 的调用者将知道何时调用 `poll` 并接收数据.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
这个 future 会读取 socket 上的可用数据,如果没有数据可用,那它会让出执行器,请求在 socket 再次可读时唤醒它的任务。然而,从这个例子里我们不能很清楚知道 Socket
类型是如何实现的, 尤其是不了解 set_readable_callback
函数如何工作。我们要怎样安排 wake()
方法在一旦
socket 变得可用时调用一次呢?一个方案就是让一个线程持续不断地检查 socket
是否可读,然后 可读时马上调用 wake()
。然而,这样子效率太低了,那对于每一个阻塞IO的 future 我们都需要 独立的线程。这会大大降低我们异步代码的效率。
实践时,这个问题是通过整合IO感知系统阻塞元件(IO-aware system blocking primitive), 像Linux上的 epoll
, FreeBSD 和 Mac OS 的 kqueue
, Windows 的 IOCP,以及 Fuchsia 的 poart
(以上这些都通过了跨平台 Rust 库 mio
暴露出来)。这个元件全都允许线程阻塞多个异步IO事件, 一旦这些事件中有一个完成了,元件就会返回。实际上,这些API通常看着像这样:
struct IoBlocker {
/* ... */
}
struct Event {
// An ID uniquely identifying the event that occurred and was listened for.
id: usize,
// A set of signals to wait for, or which occurred.
signals: Signals,
}
impl IoBlocker {
/// Create a new collection of asynchronous IO events to block on.
fn new() -> Self { /* ... */ }
/// Express an interest in a particular IO event.
fn add_io_event_interest(
&self,
/// The object on which the event will occur
io_object: &IoObject,
/// A set of signals that may appear on the `io_object` for
/// which an event should be triggered, paired with
/// an ID to give to events that result from this interest.
event: Event,
) { /* ... */ }
/// Block until one of the events occurs.
fn block(&self) -> Event { /* ... */ }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);
future 执行器能够用这些元件来提供异步IO对象,例如可配置回调在特定IO事件出现时执行的 socket。 像上面我们的 SocketRead
例子,Socket::set_readable_callback
函数可能看起来像以下伪代码:
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
// `local_executor` is a reference to the local executor.
// this could be provided at creation of the socket, but in practice
// many executor implementations pass it down through thread local
// storage for convenience.
let local_executor = self.local_executor;
// Unique ID for this IO object.
let id = self.id;
// Store the local waker in the executor's map so that it can be called
// once the IO event arrives.
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}
我们现在只需要一个执行器线程来接收并分发任何IO事件给特定 Waker
,这些 Waker
会唤醒相应 任务,允许执行器在返回来检查更多IO事件之前,驱动更多任务完成。