Future特质

Future特质是Rust异步编程中心内容。它是一种异步计算,可以产生值(尽管这个值可以为空, 如())。简化版future特质看起来可能像这样:


# #![allow(unused_variables)]
#fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
#}

Future能通过调用poll的方式推进,这会尽可能地推进future到完成状态。如果future完成了, 那就会返回poll::Ready(result)。如果future尚未完成,则返回poll::Pending,并且安排 wake()函数在Future准备好进一步执行时调用(译者注:注册回调函数)。当wake()调用 时,驱动Future的执行器会再次poll使得Future有所进展。

没有wake()的话,执行器将无从获知一个future是否能有所进展,并且会持续轮询(polling) 所以future。但有了wake()函数,执行器就能知道哪些future已经准备好轮询了。

例如,考虑一下场景:我们准备读取一个套接字(socket),它可能还没有用数据返回。如果它有 数据了,我们可以读取数据并返回poll::Ready(data),但如果没有数据准备好,我们这个future 就会阻塞并且不能继续进行。当没有数据可用时,我们必须注册wake函数,告诉执行器我们的 future准备好进一步操作。一个简单的SocketReadfuture可能像这样:


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // `socket` 有数据的时候将其读取并放置在缓冲区并返回.
            Poll::Ready(self.socket.read_buf())
        } else {
            // `socket` 还没有数据.
            //
            // 当数据来到,将调用 `wake`.
            // 这个 `future` 的调用者将知道何时调用 `poll` 并接收数据.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

Futures的这种模型允许组合多个异步操作而无需立刻分配资源。同时运行多个future或者串行(chaining)future 能够通过零分配(allocation-free)状态机实现,像这种:


# #![allow(unused_variables)]
#fn main() {
/// 一个基本的 `future`,它将同时运行其他两个 `future` 直到完成.
///
/// 并发特性是通过对每个 `future` 的轮询交错调用来实现的,
/// 从而允许每个 `future` 以自己的速度前进.
pub struct Join<FutureA, FutureB> {
    // 每个字段可能包含应该运行完成的 `future`.
    // 如果 `future` 运行完成,则将该字段设置为 `None`.
    // 这可以防止我们在运行完成之后再次对 `future` 轮询,
    // 这将不符合 `future` trait 的规范.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // 尝试运行完成这个 future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // 尝试运行完成这个 future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // 两个 `future` 都已经完成,我们可以返回成功回调.
            Poll::Ready(())
        } else {
            // 一个或者两个 `future` 都返回了 `Poll::Pending`,说明仍需要做其他工作.
            // 当有新的进度时,他们将调用 `wake()`.
            Poll::Pending
        }
    }
}
#}

上面代码展示了多个future如何同时执行而无需分别分配资源,这允许异步代码变得更高级。 类似,多个future可以一个接一个执行,像这样:


# #![allow(unused_variables)]
#fn main() {
/// 这是一个 `SimpleFuture`,依次运行直到两个 `future` 都完成.
//
// 提示: 这只是一个简单的示例, `AndThenFut` 是假设两个 `future` 在创建的时候都可用. 
// 真正的 `AndThen` 允许基于第一个 `future` 输出并创建第二个 `future`, 比
// 如 `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // 我们已经完成了第一个 `future`
                // 移除它,开始第二个.
                Poll::Ready(()) => self.first.take(),
                // 我们没有完成第一个 `future`.
                Poll::Pending => return Poll::Pending,
            };
        }
        // 现在,第一个 `future` 已经完成,
        // 那么就尝试完成第二个.
        self.second.poll(wake)
    }
}
#}

这个例子展示future特质如何表达异步控制流而无需请求多个已分配对象或深嵌套回调, 有了基本控制流后,我们来讨论真正的Future特质以及它和示例有什么区别:


# #![allow(unused_variables)]
#fn main() {
trait Future {
    type Output;
    fn poll(
        // 注意这个 `&mut self` 到 `Pin<&mut Self>` 的变化:
        self: Pin<&mut Self>,
        // 以及从 `wake: fn()` 到 `cx: &mut Context<'_>` 的变化:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
#}

我们首先注意到self参数类型不再是mut self而是Pin<&mut Self,。我们会在后面章节 更多地讨论固定(pinning)的问题,但先在我们只需要知道它能让我们创建不可移动的future类型。 不可移动对象(简称不动对象)能够储存指向另一字段(field)和指针,例如: struct MyFut { a: i32, ptr_to_a: *const i32 }。固定时于启动async/await是必需的。

然后wake: fn()变成了&mut Context<'_>。在SimpleFuture里,我们调用函数指针(fn()) 来告诉执行器返件future应该应该要轮询。然而,因为fn()是零大小的(zero-nzed),它不能 储存任何信息说明哪个Future调用了wake

在现实场景中,像Web服务器这样复杂的应用可能有上千不同的连接,带有应该相互隔离来管理的 唤醒器(wakeups)。Context类型通过提供对waker类型的访问来解决这个问题,这些waker 会唤起持定任务。