起步

欢迎使用Rust异步编程!如果你想开始编写Rust异步代码,那你找对地方了。无论你在构建Web服务器,数据库还是操作系统,本书都会教你如何使用Rust的异步编程工具来榨干硬件性能。

这本书讲什么?

本书旨在提供全面,最新的指南,让读者知道如何使用Rust的异步语言特性和代码库,萌新和老鸟都可食用。

  • 最初几章介绍异步编程概念,和Rust如何实现这些概念。
  • 中间章节讨论异步编程时可用的关键套件(utilities)和控制流工具,描述架构库和应用时最大化性能与可用性的最佳实践。
  • 最后部分涵盖更广泛的异步生态, 并示例如何完成常见任务。

就这样,我们来探索激动人心的Rust异步编程世界吧!

为什么使用异步?

我们都喜欢Rust让我们能够编写快速且安全的软件的方式,但为什么写异步代码呢?

异步编程,或者叫异步,是一种被越来越多编程语言支持的并发编程模型。它能够在一小撮 OS 线程上运行一大堆并发任务,同时还能通过 async/await 语法,保持原本同步编程的观感。

异步 vs 其他并发模型

并发编程相对于常规、顺序式编程不够成熟或“标准化”。结果是,我们表达并发的方式不一样,取决于语言支持哪种并发模型。简短地介绍最流行的并发模型能帮助你理解异步编程是如何适合更广阔的并发编程领域:

  • OS 线程 不需要编程模型作任何改动,这使得表达并发很容易。然而,线程间同步可能会很困难,并且性能开销很大。线程池可以减少一部分开销,但是不足够支持超大量 IO 密集负载。
  • 事件驱动编程,以及 回调,可以变得高性能,但倾向于导致冗长,“非线性”的控制流。数据流和错误传播通常就变得很难跟进了。
  • 协程,就像线程,但不需要改变编程模型,于是他们变得便于使用。像异步,他们可以支持大量的任务。然而,他们抽象了对于系统编程和自定义运行时实现非常重要的底层细节。
  • actor 模型 把所有的并发计算分割成称为 actor 的单元,相互之间通过易错的消息传递进行沟通,非常类似于分布式系统。actor 模型能够很高效地实现,但是它还很多没有解答的实践问题,例如流程控制和重入逻辑。

总之,异步编程既允许非常适合像Rust的低层语言的高效实现,同时也提供了线程和协程的大部分工效学效益。

Rust 的异步 vs 其他语言的

尽管很多语言都支持异步编程,但实现细节上有很多不一样。Rust 的异步实现和大部分语言的在以下方面有区别:

  • Rust 中 Futures 是惰性的,并且只有被轮询才会进一步执行。丢弃(Dropping)一个 future 可以阻止它继续执行。
  • Rust 中的 异步是零成本的,这意味着你只需要为你所使用的东西付出代价。特别来说,你使用异步时可以不需要堆分配或动态分发,这对性能来说是好事!这也使得你能够在约束环境下使用异步,例如嵌入式系统。
  • Rust 不提供内置运行时。相反,运行时由社区维护的库提供。
  • Rust里 单线程的和多线程的 运行时都可用,而他们会有不同的优劣。

Rust 中的异步 vs 线程

Rust 中异步的首选替代是使用 OS 线程,可以直接通过 std::thread 或者间接通过线程池来使用。从线程模型迁移到异步模型,或者反过来,通常需要一系列重构的工作,既包括内部实现也包括任何暴露的公开接口(如果你在构建一个库)。因此,尽早地选择适合你需要的模型能够节约大量的开发事件。

OS 线程 适合少量任务,因为线程会有 CPU 和内存开销。生成和切换线程是代价相当昂贵,甚至闲置的线程也会消耗系统资源。一个线程池库可以减轻这些开销,但并不能全部健康。然而,线程能让你重新利用存在的同步代码,而不需要大改源代码——不需要特别的编程模型。一些操作系统中,你也可以改变线程的优先级,这对于驱动或者其他延迟敏感的应用很有用。

异步 极大地降低了 CPU 和内存开销,尤其是再负载大量越过IO 边界的任务,例如服务器和数据库。同样,你可以处理比 OS 线程更高数量级的任务,因为异步运行时使用少量(昂贵的)线程来处理大量(便宜的)任务。然而,异步 Rust 会导致更大的二进制体积,因为异步函数会生成状态机,并且每个可执行文件都会绑定一个异步运行时。

最后一点,异步编程并没有 更优于 线程模型,不过它们是不一样的。如果你不需要由于性能原因使用异步,线程通常是个更简单的替换。

例子:并发下载

这个例子的目标,是并发地下载两个网页。在典型的线程化(threaded)应用中,我们需要生成线程来达到并发:

fn get_two_sites() {
    // 生成两个线程来下载网页.
    let thread_one = thread::spawn(|| download("https:://www.foo.com"));
    let thread_two = thread::spawn(|| download("https:://www.bar.com"));

    // 等待两个线程运行下载完成.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

然而,下载网页是小任务,为了这么少量工作创建线程相当浪费。对更大的应用来说,这很容易就会变成瓶颈。在异步 Rust,我们能够并发地运行这些任务而不需要额外的线程:

async fn get_two_sites_async() {
    // 创建两个不同的 "futures", 当创建完成之后将异步下载网页.
    let future_one = download_async("https:://www.foo.com");
    let future_two = download_async("https:://www.bar.com");

    // 同时运行两个 "futures" 直到完成.
    join!(future_one, future_two);
}

这里没有创建额外的线程。此外,所有函数调用都是静态分发的,也没有堆分配!然而,我们需要先编写能够异步执行的代码,而这本书会帮助你做到。

Rust 中的自定义并发模型

最后一点, Rust 不会强制你从线程模型和异步模型中间只选一个。你可以在同一个应用里同时使用两个模型,这在你混合了线程化的和异步的依赖时非常有用。事实上,你甚至可以同时使用不同的并发模型,例如事件驱动编程,只要你能找到一个实现它的库。

异步 Rust 编程目前状态

一部分异步编程设施已经有了和同步编程一样的稳定性保证,另外的部分仍在发展和变化之中。有了异步 Rust,你可以预见:

  • 为典型的并发负载提供优秀的运行时性能
  • 更频繁地与先进的语言特性交互,例如生命周期(lifetime)和固定(pinning)
  • 一些兼容性保证,例如同步和异步代码之间,以及不同异步运行时之间的兼容性。
  • 更高的维护负担,因为异步运行时和语言支持都在持续演进。

简而言之,异步 Rust 相比同步 Rust 更难使用,并且可能导致更高维护负担,但是会给你一流性能作为回报。异步 Rust 的所有地方都在持续提高,所以这些问题的影响会随着时间慢慢消退。

语言和库支持

尽管 Rust 自身提供了异步编程支持,大部分异步应用基于社区库(community crates)提供的功能。因此,你需要需要同时依靠语言特性和库支持:

  • 最基础的 traits、类型(types)和函数(functions), 例如 Future trait,由标准库提供。
  • async/await 语法由 Rust 编译器直接支持。
  • 很多工具类型、宏和函数由 futures 库提供。他们可以用在任何异步 Rust 应用
  • 异步代码的执行、IO 和任务生成均由 “异步运行时” 提供支持,例如 Tokio 和 async-std. 多数异步应用,和一些异步库,都只依赖于一个特定的运行时,详情参见“异步生态”

有一些语言特性,读者也许在同步 Rust 编程中很习惯了,但是在异步 Rust 中不可用。要补充说明的是,Rust 不允许你在 trait 中声明异步函数。作为代替,你需要用一些方法绕过以实现相同结果,但是会显得更啰嗦。

编译与调试

多数情况下,编译器的和运行时的错误在异步 Rust 中以和在 Rust 中相同的方式工作,,但是会有以下值得注意的区别:

编译错误

异步 Rust 中的编译错误遵循和同步 Rust 一样的高标准,但因为异步 Rust 通常依赖于更复杂的语言特性,例如生命周期和固定(pinning),你可能更频繁遇到这些错误。

运行时错误

无论编译器在什么时候看到一个异步函数,它会在底层生成一个状态机。异步 Rust 中的堆栈追踪通常会包含这些状态机的细节,以及来自运行时的函数调用。因此对照堆栈追踪信息也会相比同步 Rust 更可能需要关注。

新失败模式

好几种新的失败模式可以在异步 Rust 中使用。举例来说,如果你从异步上下文中调用一个阻塞函数,或者检查是否正确地实现了 Future trait。这些错误会静默传递到编译器,有时会甚至会传到单元测试中。对这些底层概念形成扎实充分的理解,也是这本书设定的目标,也会帮助你避开这些坑。

兼容性考虑

异步的和同步的代码不总是能自由地结合在一起。例如,你不能直接在同步函数里直接调用一个异步函数。同步的和异步的代码倾向于不同的设计模式,会使整合为不同环境设计的代码很困难。

甚至,异步代码之间也不总是能自由地结合在一起。一些库依赖于特定运行时来提供功能。如此,它通常会在库的依赖列表中指定。

这些兼容性问题会限制你权衡,所以要尽早调查要使用哪个异步运行时和那些库。一旦以决定好了运行时,你就不需要太关心兼容性的问题了。

性能特征

异步Rust的性能取决于您所使用的异步运行时的实现。尽管为异步Rust应用程序提供支持的运行时相对较新,但对于大多数实际工作负载而言,它们仍然表现出色。

也就是说,大多数异步生态系统都采用 多线程 运行时。这使得难以获取单线程异步应用程序的理论性能优势,即便宜的同步。另一个被忽略的用例是对 延迟敏感的任务,这些任务对于驱动程序,GUI应用程序等非常重要。此类任务取决于运行时和/或OS支持,以便进行适当的调度。您可以期望将来会为这些用例提供更好的库支持。

async/.await初步

async/.await是Rust内置语法,用于让异步函数编写得像同步代码。async将代码块转化成 实现了Future trait 的状态机。使用同步方法调用阻塞函数会阻塞整个线程,但阻塞Future只会 让出(yield)线程控制权,让其他Future继续执行。

我们来加些依赖到 Cargo.toml 文件:

[dependencies]
futures = "0.3"

你可以使用async fn语法创建异步函数:


#![allow(unused)]
fn main() {
async fn do_something() { ... }
}

async fn函数返回实现了Future的类型。为了执行这个Future,我们需要执行器(executor)

// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheduling
// multiple futures onto the same thread.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    block_on(future); // `future` is run and "hello, world!" is printed
}

async fn函数中, 你可以使用.await来等待其他实现了Future trait 的类型完成,例如 另外一个async fn的输出。和block_on不同,.await不会阻塞当前线程,而是异步地等待 future完成,在当前future无法进行下去时,允许其他任务运行。

举个例子,想想有以下三个async fn: learn_song, sing_songdance

async fn learn_song() -> Song { ... }
async fn sing_song(song: Song) { ... }
async fn dance() { ... }

一个“学,唱,跳舞”的方法,就是分别阻塞这些函数:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

然而,这样性能并不是最优——我们一次只能干一件事!显然我们必须在唱歌之前学会它,但是学唱 同时也可以跳舞。为了做到这样,我们可以创建两个独立可并发执行的async fn

async fn learn_and_sing() {
    // Wait until the song has been learned before singing it.
    // We use `.await` here rather than `block_on` to prevent blocking the
    // thread, which makes it possible to `dance` at the same time.
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` is like `.await` but can wait for multiple futures concurrently.
    // If we're temporarily blocked in the `learn_and_sing` future, the `dance`
    // future will take over the current thread. If `dance` becomes blocked,
    // `learn_and_sing` can take back over. If both futures are blocked, then
    // `async_main` is blocked and will yield to the executor.
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

这个示例里,唱歌之前必须要学习唱这首歌,但是学习唱歌和唱歌都可以和跳舞同时发生。如果我们 用了block_on(learning_song())而不是learn_and_sing中的learn_song().await, 那么当learn_song在执行时线程将无法做别的事,也让同时跳舞变得不可能。但是通过.await 执行learn_song的future,我们就可以在learn_song阻塞时让其他任务来掌控当前线程。 这样就可以做到在单线程并发执行多个future到完成状态。

揭秘: 执行 Future 与任务(Task)

在这一节,我们会讲解底层结构,理解 Future 和异步任务是如何调度的。如果你只对如何使用 Future 类型而不关心他们怎么工作的, 你可以直接跳到 async/await 章节。然而,这章讨论的内容很有用,可以帮助你理解 async/await 代码如何工作,也可以帮助你理解这些代码的运行时属性和性能属性,以及帮助你构建新的异步原语(primitives)。如果你现在决定跳过这章,你可能想要加个书签以便日后回头再看。

现在, 让我们来聊聊Future trait。

Future trait

Future trait 是 Rust 异步编程中心内容。它是一种异步计算,可以产生值(尽管这个值可以为空, 如 ())。简化版 future trait看起来可能像这样:


#![allow(unused)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

Future 能通过调用 poll 的方式推进,这会尽可能地推进 future 到完成状态。如果 future 完成了, 那就会返回 poll::Ready(result)。如果 future 尚未完成,则返回 poll::Pending,并且安排 wake() 函数在 Future 准备好进一步执行时调用(译者注:注册回调函数)。当 wake() 调用 时,驱动 Future 的执行器会再次 poll 使得 Future 有所进展。

没有 wake() 函数的话,执行器将无从获知一个 future 是否能有所进展,只能持续轮询(polling) 所有 future。但有了 wake() 函数,执行器就能知道哪些 future 已经准备好轮询了。

例如,考虑一下场景:我们准备读取一个套接字(socket),它可能还没有可以返回的数据。如果它有 数据了,我们可以读取数据并返回 poll::Ready(data),但如果数据没有准备好,我们这个future 就会阻塞并且不能继续执行。当没有数据可用时,我们需要注册 wake 函数,以在有数据可用时告诉执行 器我们的 future 准备好进一步操作。一个简单的 SocketReadfuture 可能像这样:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

Futures的这种模型允许组合多个异步操作而无需立刻分配资源。同时运行多个future或者串行(chaining)future 能够通过零分配(allocation-free)状态机实现,像这种:

/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed -- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}

上面代码展示了多个 future 如何同时执行而无需分别分配资源,这允许异步代码变得更高级。 类似,多个 future 可以一个接一个执行,像这样:

/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future -- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}

这个例子展示 future trait 如何表达异步控制流而无需请求多个已分配对象或深嵌套回调, 有了基本控制流后,我们来讨论真正的 Future trait 以及它和示例有什么区别:

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

我们首先注意到 self 参数类型不再是 mut self 而是 Pin<&mut Self>,。我们会在后面章节 更多地讨论固定(pinning)的问题,但现在我们只需要知道它能让我们创建不可移动的future类型。 不可移动对象能够储存指向另一字段(field)的指针,例如:struct MyFut { a: i32, ptr_to_a: *const i32 }。固定对于启动 async/await 是必需的。

然后 wake: fn() 变成了 &mut Context<'_>。在 SimpleFuture 里,我们调用函数指针(fn()) 来告诉执行器有future需要轮询。然而,因为 fn() 是仅仅是个函数指针,它不能储存任何信息说明哪个 Future 调用了 wake

在现实场景中,像Web服务器这样复杂的应用可能有上千不同的连接,带有应该相互隔离来管理的 唤醒器(wakeups)。Context 类型通过提供对 waker 类型的访问来解决这个问题,这些 waker 会唤起持定任务。

Waker唤醒任务

future第一次轮询时没有执行完这事很常见。此时,future需要保证会被再次轮询以进展(make progress),而这由Waker类型负责。

每次future被轮询时, 它是作为一个“任务”的一部分轮询的。任务(Task)是能提交到执行器上 的顶层future。

Waker提供wake()方法来告诉执行器哪个关联任务应该要唤醒。当wake()函数被调用时, 执行器知道Waker关联的任务已经准备好继续了,并且任务的future会被轮询一遍。

Waker类型还实现了clone(),因此可以到处拷贝储存。

我们来试试用Waker实现一个简单的计时器future吧。

应用:构建计时器

这个例子的目标是: 在创建计时器时创建新线程,休眠特定时间,然后过了时间窗口时通知(signal) 计时器future。

首先,用cargo new --lib timer_future 命令来新建项目,并加入我们需要用来编写 src/lib.rs 的依赖:


#![allow(unused)]
fn main() {
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
}

我们开始定义future类型吧。 我们的future需要一个方法,让线程知道计时器倒数完了,future 应该要完成了。我们准备用Arc<Mutex<..>>共享值来为沟通线程和future。

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}

现在,我们来实现Future吧!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

很简单,对吧?如果线程已经设置成shared_state.completed = true,我们就搞定了!否则, 我们从当前任务克隆Waker并把它传到shared_state.waker,这样线程就能回头再唤醒这个任务。

重要的是,每次future轮询后,我们必须更新Waker,这是因为这个future可能会移动到不同的 任务去,带着不同的Waker。这会在future轮询后在不同任务间移动时发生。

最后,我们需要API来构造计时器并启动线程:

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

哇!这些就是我们构建一个简单计时器future所需的内容了。现在,只要一个执行器(Executor) 执行这个future...

应用:构建执行器

Rust 的 Future 是惰性的:它们不会干任何事,除非它们被驱动执行。一个驱动future类型的 方法是在 async 函数中使用 .await 调用,但这只是将问题抛到上一层:谁来跑在顶层 async 函数返回的 future 实例呢?为此,我们需要执行 Future 的执行器。

Future 执行器会拿一组顶层 Future 去跑 poll 方法,无论这些 Future 能否进展。通常, 执行器会 poll 一个 future 实例来启动。当 Future 通过调用 wake() 方法来指示他们准备好继续 进展,执行器就会把它们放入队列并再一次 poll,重复这一过程直到 Future 完成。

在这一小节,我们要写一个我们的简单执行器,能够并发地运行大量的顶层 future 实例。

这个例子中,我们依赖 futures 库的 ArcWake trait, 它提供了简便的构造 Waker 的方法。编辑 Cargo.toml 来引入新依赖:

[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures = "0.3"

然后,我们在 src/main.rs中引入以下:

use {
    futures::{
        future::{BoxFuture, FutureExt},
        task::{waker_ref, ArcWake},
    },
    std::{
        future::Future,
        sync::mpsc::{sync_channel, Receiver, SyncSender},
        sync::{Arc, Mutex},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};

我们的执行器通过给通道(channel)发送任务来工作。执行器会从通道中拉取事件并执行它们。当 一个任务准备好进一步工作(被唤醒了)时,它会被放到 channel 的末尾,来让自己再次被调度。

在设计时,执行器自身只需要任务通道的接收端。用户会拿到发送端,那样它们就可以开辟(spawn) 新的 future 实例。任务自身仅仅是能够重新调度自身的 future, 所以我们要把它们作为和发送端 配对的 future 存储。这个发送端能够让任务重新排队。

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need to use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

我们来加一个方法,让开辟器(spawner)更容易开辟新 future 吧。这个方法会获取一个 future 类型, 把它装箱并把它变成一个 FutureObj 对象,然后把这对象放到新的 Arc<Task> 里面。这个 Arc<Task> 能够放到执行器的队列中。

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

为了轮询 future,我们需要创建 Waker。正如在任务唤醒小节中讨论到,Waker 负责调度任务在 wake 函数调用时再次轮询。记住,Waker 告诉执行器具体哪个任务已经准备好了,这使得它们 可以只轮询已经准备好的 future。创建 Waker 的最简单方法是实现 ArcWake trait,然后使用 waker_ref 或者 .into_waker() 函数来把 Arc<impl ArcWake> 转变成 Waker。我们来给我们的任务实现 ArcWake,以便它们可以变成 Waker 并且被唤醒:

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

WakerArc<Task> 创建了之后,调用 wake() 函数会拷贝一份 Arc,发送到任务的通道去。 我们的执行器就会拿到这个任务并轮询它。我们来实现这个吧:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if future.as_mut().poll(context).is_pending() {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

恭喜!我们现在有一个能干活的 future 执行器了。我们甚至能用它来运行 async/.await 代码和定制的 future,例如我们前面写的 TimeFuture

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}

执行器与系统IO

在前面 Future trait小节,我们讨论了一个对 socket 进行异步读取的 future 例子:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // `socket` 有数据的时候将其读取并放置在缓冲区并返回.
            Poll::Ready(self.socket.read_buf())
        } else {
            // `socket` 还没有数据.
            //
            // 当数据来到,将调用 `wake`.
            // 这个 `future` 的调用者将知道何时调用 `poll` 并接收数据.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

这个 future 会读取 socket 上的可用数据,如果没有数据可用,那它会让出执行器,请求在 socket 再次可读时唤醒它的任务。然而,从这个例子里我们不能很清楚知道 Socket 类型是如何实现的, 尤其是不了解 set_readable_callback 函数如何工作。我们要怎样安排 wake() 方法在一旦
socket 变得可用时调用一次呢?一个方案就是让一个线程持续不断地检查 socket 是否可读,然后 可读时马上调用 wake()。然而,这样子效率太低了,那对于每一个阻塞IO的 future 我们都需要 独立的线程。这会大大降低我们异步代码的效率。

实践时,这个问题是通过整合IO感知系统阻塞元件(IO-aware system blocking primitive), 像Linux上的 epoll, FreeBSD 和 Mac OS 的 kqueue, Windows 的 IOCP,以及 Fuchsia 的 poart (以上这些都通过了跨平台 Rust 库 mio 暴露出来)。这个元件全都允许线程阻塞多个异步IO事件, 一旦这些事件中有一个完成了,元件就会返回。实际上,这些API通常看着像这样:

struct IoBlocker {
    /* ... */
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { /* ... */ }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { /* ... */ }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);

future 执行器能够用这些元件来提供异步IO对象,例如可配置回调在特定IO事件出现时执行的 socket。 像上面我们的 SocketRead 例子,Socket::set_readable_callback 函数可能看起来像以下伪代码:

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

我们现在只需要一个执行器线程来接收并分发任何IO事件给特定 Waker,这些 Waker 会唤醒相应 任务,允许执行器在返回来检查更多IO事件之前,驱动更多任务完成。

async/.await

第一章,我们简单介绍了 async/.await,并且用它构建一个简单的服务器。这一章会详细讨论 async/.await,解释它如何工作以及 async 代码如何和传统Rust程序不同。

async/.await 是特殊的Rust语法,使得让出当前线程控制权成为可能,而不是阻塞它,也允许其他代码在等待一个操作完成时取得进展。

有两种主要的方法使用 async: async fnasync 块。两种方法都返回一个实现了 Future trait 的值:


// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}

就像我们在第一章中看到,async 体以及其他 future 类型是惰性的:除非它们运行起来,否则它们什么都不做。运行 Future 最常见的方法是 .await 它。当 .awaitFuture 上调用时,它会尝试把 future 跑到完成状态。如果 Future 被阻塞了,它会让出当前线程的控制权。能取得进展时,执行器就会捡起这个 Future 并继续执行,让 .await 求解。

async 生命周期

和传统函数不同,async fn 会获取引用以及其他拥有非 'static 生命周期的参数,并返回被这些参数的 生命周期约束的 Future

// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

这意味着这些 future 被 async fn 函数返回后必须要在它的非 'static 参数仍然有效时 .await。 在通常的场景中,future 在函数调用后马上 .await(例如 foo(&x).await),并不会有大问题。然而,如果储存了这些 future 或者把它发送到其他的任务或者线程,那就有问题了。

把带有引用参数的 async fn 转化成一个'staticfuture 的一个常用的规避方法是把这些参数 和对 async fn 的函数调用封装到async 块中:

fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

通过移动参数到 async 块中,我们把它的生命周期扩展到了匹配调用 foo 函数返回的 Future 的生命周期。

async move

async 块和闭包允许使用 move 关键字,这和普通的闭包一样。一个 async move 块会获取 所指向变量的所有权,允许它的生命周期超过当前作用域(outlive),但是放弃了与其他代码共享这些变量的能力:

/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{my_string}");
    };

    let future_two = async {
        // ...
        println!("{my_string}");
    };

    // Run both futures to completion, printing "foo" twice:
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{my_string}");
    }
}

在多线程执行器中 .await

提醒一下,在使用多线程的 Future 执行器时,一个 Future 可能在线程间移动,所以任何在 async 体中使用的变量必须能够穿过线程,因为任何 .await 都有可能导致线程切换。

这意味着使用 Rc&RefCell 或者其他没有实现 Send trait 的类型是不安全的,包括那些指向 没有 Sync trait 类型的引用。

(注意:使用这些类型是允许的,只要他们不是在调用 .await 的作用域内。)

类似的,横跨 .await 持有一个非 future 感知的锁这种做法是很不好的,因为它能导致整个线程池 锁上:一个任务可能获得了锁,.await 然后让出到执行器,允许其他任务尝试获取所并导致死锁。 为了避免这种情况,使用 futures::lock里的 Mutex 类型比起 std::sync 里面的更好。

固定(Pinning)

为了轮询 future,future 首先要用特殊类型 Pin<T> 来固定。如果你读了前面 执行 Future 与任务 小节中关于 Future trait 的解释,你会从 Future::poll 方法的定义中认出 Pin。但这意味什么?我们为什么需要它?

为什么需要固定

PinUnpin 标记 trait 搭配使用。固定保证了实现了 !Unpin trait 的对象不会被移动。为了理解这为什么必须,我们回忆一下 async/.await 怎么工作吧。考虑以下代码:

let fut_one = ...;
let fut_two = ...;
async move {
    fut_one.await;
    fut_two.await;
}

这段代码实际上创建了一个实现了 Future trait 的匿名类型,提供了 poll 方法,如下:

// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// List of states our `async` block can be in
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}

poll 第一次调用时,它会轮询 fut_one。如果 fut_one 不能完成,那么 AsyncFuture::poll 就会返回。调用 poll 的 Future 会从上次中断的地方继续。这个过程会持续到 future 成功完成。

然而,如果我们在 async 块中用了引用呢?例如:

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

这会编译成什么结构呢?

struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}

这里,ReadIntoBuf future 持有了一个指向其他字段 x 的引用。然而,如果 AsyncFuture 被移动了,x 的位置(location)也会被移走,使得存储在 read_into_buf_fut.buf 的指针失效。

固定 future 到内存特定位置则阻止了这种问题,让创建指向 async 块的引用变得安全。

固定的细节

我们来用一个简单点的例子来理解固定吧。我们遇到了上面的问题,这本质是关于我们如何在 Rust 里处理引用和自引用类型(self-referential types)。

我们来看个例子:


#![allow(unused)]
fn main() {
#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
        }
    }

    fn init(&mut self) {
        let self_ref: *const String = &self.a;
        self.b = self_ref;
    }

    fn a(&self) -> &str {
        &self.a
    }

    fn b(&self) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}
}

Test 类型提供了方法,来获取字段 ab 的引用。因为 b 是指向 a 的引用,但由于 Rust 的借用规则,我们不能定义它的生命周期(lifetime),所以我们把它存成指针。现在我们有了一个自引用结构体了。

如果不把我们的数据四处转移,我们的例子可以运行得很好:

fn main() {
    let mut test1 = Test::new("test1");
    test1.init();
    let mut test2 = Test::new("test2");
    test2.init();

    println!("a: {}, b: {}", test1.a(), test1.b());
    println!("a: {}, b: {}", test2.a(), test2.b());

}
#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
        }
    }

    // We need an `init` method to actually set our self-reference
    fn init(&mut self) {
        let self_ref: *const String = &self.a;
        self.b = self_ref;
    }

    fn a(&self) -> &str {
        &self.a
    }

    fn b(&self) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

我们可以得到预期结果:


#![allow(unused)]
fn main() {
a: test1, b: test1
a: test2, b: test2
}

来看看如果我们把 test1test2 交换了,会发生什么:

fn main() {
    let mut test1 = Test::new("test1");
    test1.init();
    let mut test2 = Test::new("test2");
    test2.init();

    println!("a: {}, b: {}", test1.a(), test1.b());
    std::mem::swap(&mut test1, &mut test2);
    println!("a: {}, b: {}", test2.a(), test2.b());

}
#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
        }
    }

    fn init(&mut self) {
        let self_ref: *const String = &self.a;
        self.b = self_ref;
    }

    fn a(&self) -> &str {
        &self.a
    }

    fn b(&self) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

我们可能以为它只会把 test1 打印了两次:


#![allow(unused)]
fn main() {
a: test1, b: test1
a: test1, b: test1
}

但实际上我们得到得结果是:


#![allow(unused)]
fn main() {
a: test1, b: test1
a: test1, b: test2
}

现在指针 test2.b 仍然指向 test1 内部的旧位置。这个结构体不再是自引用的了,它持有一个指向不同对象的字段的指针。这意味着我们不能依赖 test2.b 的生命周期会和 test2 的生命周期绑定。

如你仍然有些疑惑,以下这个例子应该可以使你信服:

fn main() {
    let mut test1 = Test::new("test1");
    test1.init();
    let mut test2 = Test::new("test2");
    test2.init();

    println!("a: {}, b: {}", test1.a(), test1.b());
    std::mem::swap(&mut test1, &mut test2);
    test1.a = "I've totally changed now!".to_string();
    println!("a: {}, b: {}", test2.a(), test2.b());

}
#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
        }
    }

    fn init(&mut self) {
        let self_ref: *const String = &self.a;
        self.b = self_ref;
    }

    fn a(&self) -> &str {
        &self.a
    }

    fn b(&self) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

这张图能帮助我们可视化到底发生了什么:

图1:结构体交换前后 swap_problem

这图很容易展现未定义行为(Undefined Behavior, UB)以及其他类似的使用方式可能会出错。

固定的实践

我们来看看固定和 Pin 类型如何帮助我们解决这个问题。

Pin 类型包装了指针类型, 保证没有实现 Unpin 指针指向的值不会被移动。例如, Pin<&mut T>, Pin<&T>, Pin<Box<T>> 都保证了 T 不会被移动,即使 T: !Unpin.

多数类型被移走也不会有问题。这些类型实现了 Unpin trait。指向 Unpin 类型的指针能够自由地放进 Pin,或取走。例如,u8Unpin 的,所以 Pin<&mut T> 的行为就像普通的 &mut T,就像普通的 &mut u8

然而,那些被固定后不能再移动的类型有一个标记 trait !Unpin。 async/await 创建的 Future 就是一个例子。

固定到栈上

回到我们的例子。我们能用 Pin 来解决我们的问题。我们来看看,如果我们需要用一个固定的指针,我们的例子会编程什么样:


#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}


impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned, // This makes our type `!Unpin`
        }
    }

    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}
}

如果我们的类型实现了 !Unpin,那么固定这个类型的对象到栈上总是 unsafe 的行为。你可以用像是 pin_utils 的库来在将数据固定到栈上的时候避免写 unsafe

下面,我们将对象 test1test2 固定到栈上:

pub fn main() {
    // test1 is safe to move before we initialize it
    let mut test1 = Test::new("test1");
    // Notice how we shadow `test1` to prevent it from being accessed again
    let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
    Test::init(test2.as_mut());

    println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
    println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}
use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}


impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            // This makes our type `!Unpin`
            _marker: PhantomPinned,
        }
    }

    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

现在,如果我们尝试将我们的数据移走,我们会遇到编译错误:

pub fn main() {
    let mut test1 = Test::new("test1");
    let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
    Test::init(test2.as_mut());

    println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
    std::mem::swap(test1.get_mut(), test2.get_mut());
    println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}
use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}


impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned, // This makes our type `!Unpin`
        }
    }

    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

类型系统会阻止我们移动这些数据。

重点记住,固定到栈总是依赖你在写 unsafe 代码时提供的保证。例如,我们知道了 &'a mut T被指向对象(pointee) 在生命周期 'a 期间固定,我们不知道被 &'a mut T 指向数据是否在 'a 结束后仍然不被移动。如果移动了,将会违反固定的协约。

另外一个常见错误是忘记遮蔽(shadow)原本的变量,因为你可以释放 Pin 然后移动数据到 &'a mut T,像下面这样(这违反了固定的协约):

fn main() {
   let mut test1 = Test::new("test1");
   let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
   Test::init(test1_pin.as_mut());

   drop(test1_pin);
   println!(r#"test1.b points to "test1": {:?}..."#, test1.b);

   let mut test2 = Test::new("test2");
   mem::swap(&mut test1, &mut test2);
   println!("... and now it points nowhere: {:?}", test1.b);
}
use std::pin::Pin;
use std::marker::PhantomPinned;
use std::mem;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}


impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            // This makes our type `!Unpin`
            _marker: PhantomPinned,
        }
    }

    fn init<'a>(self: Pin<&'a mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a<'a>(self: Pin<&'a Self>) -> &'a str {
        &self.get_ref().a
    }

    fn b<'a>(self: Pin<&'a Self>) -> &'a String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

固定到堆上

固定 !Unpin 类型到堆上,能给我们的数据一个稳定的地址,所以我们知道我们指向的数据不会在被固定之后被移动走。和在栈上固定相反,我们知道整个对象的生命周期期间数据都会被固定在一处。

use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}

impl Test {
    fn new(txt: &str) -> Pin<Box<Self>> {
        let t = Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned,
        };
        let mut boxed = Box::pin(t);
        let self_ptr: *const String = &boxed.a;
        unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };

        boxed
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        unsafe { &*(self.b) }
    }
}

pub fn main() {
    let test1 = Test::new("test1");
    let test2 = Test::new("test2");

    println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
    println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}

一些函数需要他们协作的 future 是 Unpin 的。为了让这些函数使用不是 UnpinFutureStream,你首先需要这个值固定,要么用 Box::pin(创建 Pin<Box<T>>)要么使用 pin_utils::pin_mut!(创建 Pin<&mut T>)。Pin<Box<Fut>>Pin<&mut Fut> 都能用作 future,并且都实现了 Unpin

例如:

use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io

// A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { /* ... */ }

let fut = async { /* ... */ };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait

// Pinning with `Box`:
let fut = async { /* ... */ };
let fut = Box::pin(fut);
execute_unpin_future(fut); // OK

// Pinning with `pin_mut!`:
let fut = async { /* ... */ };
pin_mut!(fut);
execute_unpin_future(fut); // OK

总结

  1. 如果 T: Unpin(默认会实现),那么 Pin<'a, T> 完全等价于 &'a mut T。换言之: Unpin 意味着这个类型被移走也没关系,就算已经被固定了,所以 Pin 对这样的类型毫无影响。

  2. 如果 T: !Unpin, 获取已经被固定的 T 类型示例的 &mut T需要 unsafe。

  3. 标准库中的大部分类型实现 Unpin,在 Rust 中遇到的多数“平常”的类型也是一样。但是, async/await 生成的 Future 是个例外。

  4. 你可以在 nightly 通过特性标记来给类型添加 !Unpin 约束,或者在 stable 给你的类型加 std::marker::PhatomPinned 字段。

  5. 你可以将数据固定到栈上或堆上

  6. 固定 !Unpin 对象到栈上需要 unsafe

  7. 固定 !Unpin 对象到堆上不需要 unsafeBox::pin可以快速完成这种固定。

  8. 对于 T: !Unpin 的被固定数据,你必须维护好数据内存不会无效的约定,或者叫 固定时起直到释放。这是 固定协约 中的重要部分。

Steam trait

Stream trait 与 Future 类似,但能在完成前返还(yield)多个值,与标准库中的 Iterator 类似:


#![allow(unused)]
fn main() {
trait Stream {
    /// 由 `stream` 产生的值的类型.
    type Item;

    /// 尝试解析 `stream` 中的下一项.
    /// 如果已经准备好,就重新运行 `Poll::Pending`, 如果已经完成,就重新
    /// 运行`Poll::Ready(Some(x))`,如果已经完成,就重新运行 `Poll::Ready(None)`.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}
}

一个常见的使用 Stream 的例子是 futures 库中通道的 Receiver。每次 Sender 端发送一个值时,它就会返回一个 Some(val),并且会在 Sender 关闭且所有消息都接收后返还 None:


#![allow(unused)]
fn main() {
async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` 类似于 `Iterator::next`, 但会返回一个实现
    // 了 `Future<Output = Option<T>>` 的类型.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}
}

迭代与并发

与同步的 Iterator 类似,有很多不同的方法可以迭代处理 Stream 中的值。有很多组合子风格的方法, 如 mapfilterfold,以及它们的“遇错即断”版本 try_maptry_filtertry_fold

不幸的是,for 循环不能用在 Stream 上,但是对于命令式编程风格(imperative style)的代码, while let 以及 next/try_next 函数还可以使用:


#![allow(unused)]
fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // 对于 `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // 对于 `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
}

然而,如果我们每次只处理一个元素,我们就会失去并发的机会,而这又是我们编写异步代码的首要目的。 为了并发处理一个 Stream 的多个值,使用 for_each_concurrenttry_for_each_concurrent 方法:


#![allow(unused)]
fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // 对于 `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
}

同时执行多个 Future

直到现在,我们几乎只用 .await 来执行future,而这会阻塞并发任务,直到特定的 Future 完成。然而,真实的异步应用经常需要并发执行几个不同的操作。

这一章,我们会覆盖一些同事执行多个异步操作的方法:

  • join!:等待所有future完成
  • select!:等待其中一个future完成
  • 开辟(Spawning): 创建顶层任务,运行future至完成
  • FuturesUnordered: 一组返还子future的future

join!

futures::join 宏等待并发执行的多个不同 future 完成。

join!

当进行多个异步操作时,可以简单地用 .await 串行执行:


#![allow(unused)]
fn main() {
async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}
}

然而,这实际上比必要的慢,因为我们不必在 get_book 完成后再 get_music。在其它编程语言 中,future 是运行至完成的,所以两个操作可以通过先调起 async fn 来启动 future,然后再分别 await 他们来并发操作:


#![allow(unused)]
fn main() {
// 这是错误示例,不要模仿
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}
}

然而,Rust future 不会干任何事情,除非它们已经 .await 了。这意味着上面这两段代码都会串行执行 book_futuremusic_future 而非并发执行。为了正确地并发这两个future,使用 futures::join!


#![allow(unused)]
fn main() {
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}
}

join! 返回值是包含每个传入 future 的输出的元组。

try_join!

对于那些返回 Result 的 future,考虑使用 try_join! 而非 join。因为 join 只会在所有子 future 都完成后才会完成,它甚至会在子 future 返回 Err 之后继续处理。

join! 不同,try_join! 会在其中的子future返回错误后立即完成。


#![allow(unused)]
fn main() {
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

注意,传进 try_join! 的 future 必须要用相同的错误类型。考虑使用 futures::future::TryFutureExt 库的 .map_err(|e| ...)err_into() 函数来统一错误类型:


#![allow(unused)]
fn main() {
use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

select!

futures::select 宏同时跑多个 future,允许用户在任意 future 完成时响应:


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // 为了 `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

上面的函数会并发跑 t1t2。当 t1t2 结束时,对应的句柄(handler)会调用 println!,然后函数就会结束而不会完成剩下的任务。

select 的基本格式为 <pattern> = <expression> => <code>,,可以重复你想 select 的任意多future。

default => ...complete => ...

select 也支持 defaultcomplete 分支。

default 会在被 select 的future都没有完成时执行,因此,带有 default 分支的 select 总是马上返回,因为 default 会在没有其它future准备好的时候返回。

complete 分支则用来处理所有被 select 的 future 都完成并且不需进一步处理的情况。这在循环 select 时很好用:


#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // 永远不会被执行(futures都准备好了,然后complete分支被执行)
        };
    }
    assert_eq!(total, 10);
}
}

UnpinFusedFuture 交互

你会注意到,在上面第一个例子中,我们在两个 async fn 函数返回的future上调用了 .fuse(),然后用 pin_mut 来固定他们。这两个调用都是必需的,用在 select 中的 future 必须实现 UnpinFusedFuture

需要 Unpin 是因为 select 是用可变引用访问 future 的,不获取 future 的所有权。未完成的 future 因此可以在 select 调用后继续使用。

类似的,需要 FusedFuture 是因为 select 一定不能轮询已完成的 future。FusedFuture 用来追踪(track)future是否已完成。这种使得在循环中使用 select 成为可能,只轮询尚未完成的 future。这可以从上面的例子中看出,a_futb_fut 可能会在第二次循环的时候已经完成了。因为 future::ready 返回的 future 实现了 FusedFuture,所以 select 可以知道不必再次轮询它了。

注意,stream 也有对应的 FusedStream trait。实现了这个 trait 或者被 .fuse() 包装的 Stream 会从它们的 .next/try_next() 组合子中返还 FusedFutre


#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

带有 FuseFuturesUnorderedselect 循环中的并发任务

有个不太好找但是很趁手的函数叫 Fuse::terminated()。这个函数允许构造已经被终止的空 future,并且能够在之后填进需要运行的 future。

这个在一个任务需要 select 循环中运行但是它本身是在 select 循环中创建的场景中很好用。

注意下面 .select_next_some() 函数的用法。它可以用在 select 上,并且只运行从 stream 返回的 Some(_) 值而忽略 None


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // 计时器已经完成了.
                // 如果没有`get_new_num_fut`正在执行的话,就启动一个新的.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // 一个新的数字到达了
                // 启动一个新的`run_on_new_num_fut`并且扔掉旧的.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // 执行`run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // 当所有都完成时panic,
            // 因为理论上`interval_timer`会不断地产生值.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

当有很多份相同 future 的拷贝同时执行时,使用 FutureUnordered 类型。下面的例子和上面的例子很类似,但会运行 run_on_new_num_fut 的所有拷贝都到完成状态,而不是当一个新拷贝创建时就中断他们。它也会打印 run_on_new_num_fut 的返回值:


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// 用从`get_new_num`获取的最新的数字运行`run_on_new_num`.
//
// 每当定时器到期后,都会重新执行`get_new_num`,
// 并立即取消正在执行的`run_on_new_num`,随后用新返回值替换`run_on_new_num`.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // 计时器已经完成了.
                // 如果没有`get_new_num_fut`正在执行的话,就启动一个新的.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // 一个新的数字到达了,启动一个新的`run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // 执行`run_on_new_num_futs`并检查有没有完成的.
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // 当所有都完成时panic,
            // 因为理论上`interval_timer`会不断地产生值.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}

需要知道并喜爱上的规避方法

Rust 的 async 支持还是相当新的,所以还有一大堆大家都想要的特性还在开发,以及一些不太用到分析技术。这章我们会讨论一些常见的痛点并解释如何规避它们。

async 块中的 ? 运算符

和在 async fn 中一样, 在 async 块中使用 ? 是很寻常的. 然而, async 块的返回类型不是被显式声明的. 这会导致编译器不能推断 async 块的错误类型.

如下面的代码:


#![allow(unused)]
fn main() {
struct MyError;
async fn foo() -> Result<(), MyError> { Ok(()) }
async fn bar() -> Result<(), MyError> { Ok(()) }
let fut = async {
    foo().await?;
    bar().await?;
    Ok(())
};
}

会触发这个错误:

error[E0282]: type annotations needed
 --> src/main.rs:5:9
  |
4 |     let fut = async {
  |         --- consider giving `fut` a type
5 |         foo().await?;
  |         ^^^^^^^^^^^^ cannot infer type

不幸的是, 现在还不能"给 fut 一个类型", 也没有办法 显式地指定 async 块的返回类型. 要解决这个问题, 你可以使用"涡轮运算符"来为块 async 提供成功和错误时的类型:


#![allow(unused)]
fn main() {
struct MyError;
async fn foo() -> Result<(), MyError> { Ok(()) }
async fn bar() -> Result<(), MyError> { Ok(()) }
let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- note the explicit type annotation here
};
}

Send 模拟

一些 async fn 状态机是可以安全地跨线程传递(Send)的,但另外的不可以。一个 async fnFuture 是否 Send 取决于是否有非 Send 类型跨越 .await 点被持有了。当编译器发现有些值可能会跨 .await 持有时。编译器尽可能地模拟 Send,但是这种分析今天在一些地方过于保守。

例如,考虑一个简单的非 Send 类型,可能是一种持有 Rc 的类型:


#![allow(unused)]
fn main() {
use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);
}

类型 NotSend 的变量可能会很简单地作为临时变量出现在 async fn 函数中,甚至会出现在 async fn 函数返回的 Future 类型必须是 Send 的时候:

use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

然而,如果我们改动 foo 来存一个 NotSend 变量,这个例子就不再编译了:

use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    let x = NotSend::default();
    bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
   require_send(foo());
}
error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
  --> src/main.rs:15:5
   |
15 |     require_send(foo());
   |     ^^^^^^^^^^^^ `std::rc::Rc<()>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
   = note: required because it appears within the type `NotSend`
   = note: required because it appears within the type `{NotSend, impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
note: required by `require_send`
  --> src/main.rs:12:1
   |
12 | fn require_send(_: impl Send) {}
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.

这个错误是正确的。如果我们把 x 存到变量中去,它不会被丢弃(drop),直到 .await 之后,这时 async fn 可能在另外一个线程中运行。因为 Rc 不是 Send 的,允许它穿过线程是不合理的。一个简单的解决方法是应该在 .await 之前 drop 掉这个 Rc,但是不幸的是现在这种方法还不能工作。

为了规避这个问题,你可能需要引入一个块作用域来封装任何非 Send 变量。这会让编译器更容易发现这些变量不会存活超过 .await 点。

use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
   require_send(foo());
}

递归

在内部,async fn 创建了一个包含了要 .await 的子 Future 的状态机。这样递归的 async fn 有点诡异,因为结果的状态机必须包含它自身:


#![allow(unused)]
fn main() {
async fn step_one() { /* ... */ }
async fn step_two() { /* ... */ }
struct StepOne;
struct StepTwo;
// This function:
async fn foo() {
    step_one().await;
    step_two().await;
}
// 生成一个这样的类型:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// 所以这个函数:
async fn recursive() {
    recursive().await;
    recursive().await;
}

// 生成一个这样的类型:
enum Recursive {
    First(Recursive),
    Second(Recursive),
}
}

这不会工作——我们创建了大小为无限大的类型!编译器会抱怨:

error[E0733]: recursion in an `async fn` requires boxing
 --> src/lib.rs:1:22
  |
1 | async fn recursive() {
  |                      ^ an `async fn` cannot invoke itself directly
  |
  = note: a recursive `async fn` must be rewritten to return a boxed future.

为了允许这种做法,我们需要用 Box 来间接调用。而不幸的是,编译器限制意味着把 recursive() 的调用包裹在 Box::pin 并不够。为了让递归调用工作,我们必须把 recursive 转换成非 async 函数,然后返回一个 .boxed() 的异步块


#![allow(unused)]
fn main() {
use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}
}

trait 中的 async

目前,async fn 不能在 trait 中使用。原因一些复杂,但是有计划在未来移除这个限制。

不过,这个问题可以用crates.io 的 async_trait来规避。

注意,这些 trait 方法会导致每个函数调用都需要分配堆内存。这可能对于大部分应用都不是特别严重的开销,但是在决定是否要把这个功能作为底层函数的公共API,尤其这个函数可能每秒调用上百万次时则需要多加考虑。

异步生态系统

Rust目前仅提供编写异步代码最基础的能力。重要的是,标准库尚未提供执行器,任务,反应器,组合器以及底层I/O futures和 trait。同时,社区提供的异步生态系统填补了这些空白。

异步基础团队正在扩展书中的示例,来涵盖多个运行时。如果你对这个项目做出贡献有兴趣,请在 Zulip 上联系我们。

异步运行时

异步运行时是用于执行异步应用程序的库。运行时通常将一个反应器与一个或多个执行器捆绑在一起。反应器为外部事件提供订阅机制,例如异步I/O,进程间通信以及计时器。在异步运行时中,订阅用户通常是代表底层I/O操作的futures。执行器负责任务的计划和执行。它们跟踪正在运行和挂起的任务,轮询futures以完成任务,并在有进展时唤醒任务。 “执行器”一词经常与“运行时”互换使用。在这里,我们使用“生态系统”一词来描述一个绑定了兼容 trait 和功能的运行时。

社区提供的异步库

Futures库

futures包含可用于编写异步代码的 trait 和功能。这包括 StreamSinkAsyncReadAsyncWrite trait,以及诸如组合器的实用工具。这些实用工具和 trait 最终可能成为标准库的一部分。

futures 有它自己的执行器,但没有自己的反应器,所以它不支持异步I/O或计时器 futures 的执行。因为这个原因,它不被视为一个完整的运行时。一个常见的选择是将futures 中的实用工具与另一个库中的执行器一起使用。

主流的异步运行时

标准库中没有异步运行时,官方也没有建议这样做。下面列举的库提供了主流的运行时。

  • Tokio:一个具有HTTP,gRPC和跟踪框架的主流异步生态系统。
  • async-std:一个提供标准库组件级别的库。
  • smol:一个小且简单的异步运行时。提供可用于包装 UnixStreamTcpListener 此类的结构的 Async trait。
  • fuchsia-async:在Fuchsia操作系统中使用的执行器。

确定生态系统兼容性

并非所有异步应用程序,框架和库都彼此兼容,也不是和每个操作系统或平台都兼容。大多数异步代码可以在任一生态系统中使用,但是某些框架和库会要求使用特定的生态系统。生态系统限制并不总是记录在案的,但是有一些经验法则可以确定一个库,trait 或功能是否取依赖特定的生态系统。

与异步I/O,计时器,进程间通信或任务交互的异步代码通常都取依赖特定的异步执行器或反应器。除此以外的异步代码,例如异步表达式,组合器,同步类型和流,通常都与生态系统无关,所有嵌套的futures也与生态系统无关。在开始项目之前,建议先调研相关的异步框架和库,以确定与您选择的运行时以及彼此之间的兼容性。

值得注意的是,Tokio 使用 mio 反应器并定义了自己的异步I/O trait 的版本,包括 AsyncReadAsyncWrite。 它本身与 async-stdsmol 不兼容,它们依赖于async-executor以及在 futures 中定义的 AsyncReadAsyncWrite trait。

有时可以通过兼容性层解决运行时冲突需求,它允许您在另一个运行时调用为当前运行时编写的代码。 例如,async_compat提供了 Tokio 和其他运行时。

暴露异步API的库不应依赖于特定的执行器或反应器,除非它们需要生成任务、定义自己的异步I/O或者计时器futures。理想情况下,仅二进制程序应负责计划和运行任务。

单线程 vs 多线程执行器

异步执行器可以是单线程,也可以是多线程。例如,async-executor 同时具有单线程 LocalExecutor 和多线程 Executor

多线程执行器可以同时完成多个任务。对于具有许多任务的工作负载,它可以大大加快执行速度,但是在任务之间同步数据的开销通常更大。在单线程和多线程运行时之间进行选择时,建议测量应用程序的性能。

任务可以在创建它们的线程上运行,也可以在单独的线程上运行。异步运行时通常提供将任务生成到单独线程上的功能。即使任务在单独的线程上执行,它们也应该是非阻塞的。为了在多线程执行器上计划任务,它们必须是 Send 。一些运行时提供了生成 non-Send 任务的功能,从而确保了每个任务都在生成它的线程上执行。它们还可以提供将阻塞任务生成到专用线程上的功能,这对于运行来自其他库的阻塞同步代码很有用。

最终项目:用异步 Rust 构建一个并发 Web 服务器

在这一章,我们会使用异步 Rust 来修改 Rust 书的 一个单线程 web 服务器 来并发地服务请求

回顾

以下是我们那节课1最后得到的代码:

src/main.rs:

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    // 在端口7878侦听传入链接
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    // 一直阻塞,处理到达这个IP地址的每一个请求
    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // 从流中读取前1024字节的数据
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    // 根据请求的数据决定响应问候还是404.
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    // 将响应写回流并刷新(flush)以确保响应被发送回客户端.
    let response = format!("{}{}", status_line, contents);
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

hello.html:

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
  </body>
</html>

404.html:

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
  </body>
</html>

如果你使用 cargo run 运行这个服务器,然后在浏览器中访问 127.0.0.1:7878,你会受到 Ferris 友好欢迎!

运行异步代码

一个 HTTP 服务器 理应能够并发地服务多个客户端;也就是,它不应该等待前一个请求完成后才处理当前请求。官方书通过创建一个线程池,使得每个链接都由独立的线程处理来 解决这个问题。在这里,与其通过增加线程来提高吞吐量,我们使用异步代码来达到同样的效果。

让我们修改 handle_connection 来返回一个 future,只需要声明为 async fn:

async fn handle_connection(mut stream: TcpStream) {
    //<-- snip -->
}

在函数声明里加上 async 会改变它的返回类型,从单元类型 () 变成一个实现了 Future<Output=()> 的类型。

如果我们尝试编译这个代码,编译器会警告我们它不会工作:

$ cargo check
    Checking async-rust v0.1.0 (file:///projects/async-rust)
warning: unused implementer of `std::future::Future` that must be used
  --> src/main.rs:12:9
   |
12 |         handle_connection(stream);
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: futures do nothing unless you `.await` or poll them

因为我们还没有 await 或者 poll handle_connection 返回的结果,它永远不会运行的。如果你这时运行了服务器,并且在浏览器中访问 127.0.0.1:7878,你会看到链接被拒绝了;我们的服务器没有在处理请求。

我们不能在同步代码中 await 或者 poll future 类型。我们需要一个异步运行时来处理调度及运行 future 类型至完成状态。请在 选择一个运行时小节 中获取更多关于异步运行时,执行器和反应器的信息。任何列出的运行时都可以在这个项目里工作,但是对于例子,我们选择了使用 async-std

增加异步运行时

这里我们会使用 async-std 库的执行器。async-std 库里的 #[async_std::main] 属性允许我们编写异步的 main 函数。为了使用它,得先在 Cargo.toml 里启用 async-stdattributes 特性:

[dependencies.async-std]
version = "1.6"
features = ["attributes"]

第一步,我们要切换到异步的 main 函数,并且 await 异步版 handle_connection 函数返回的 future。然后,我们需要测试服务器是怎样响应的。这里是代码应有的样子:

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        // Warning: This is not concurrent!
        handle_connection(stream).await;
    }
}

现在,我们来测试看看是否我们的服务器能够并发地处理连接。简单的把 handle_connection 改成异步并不意味着服务器可以同时处理多个链接,我们将看到为什么。

为了阐明这个原因,我们来模拟一个缓慢的请求。当客户端请求到 127.0.0.1:7878/sleep 时,我们的服务器会休眠 5 秒:

use async_std::task;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{status_line}{contents}");
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

这非常类似官方书中 模拟慢请求一节,但是有个重要区别:我们使用的是非阻塞函数 async_std::task::sleep 而不是阻塞函数 std::thread::sleep。这很重要,要记住一块代码是在 async fn 中并且被 await,因为它可能会阻塞。为了测试我们的服务器能否并发处理连接,我们需要保证 handle_connection 是非阻塞的。

如果你运行这个服务器,你会看到,一个发送给 127.0.0.1:7878/sleep 的请求,会阻塞其他后续的请求 5秒!这是因为当我们在 await handle_connection 的结果时,没有其他的并发任务能有进展。在下一小节,我们会看到如何使用异步代码来并发处理连接。

并发处理链接

现在我们代码的问题,是 listener.incoming() 是一个阻塞的迭代器。执行器不能在 listener 等待接入连接时运行其他 future,使得我们不能处理一个新连接,直到我们处理完前一个连接。

为了修复这个问题,我们要转化 listener.incoming(),从阻塞迭代器转换成非阻塞的流。流类似于迭代器,但是会异步地被消耗。更详细的请查看关于流的章节.

我们来把阻塞的 std::net::TcpListener 替换成非阻塞的 async_std::net::TcpListener,并且将我们的连接处理器更新为接受 async_std::net::TcpStream

use async_std::prelude::*;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    //<-- snip -->
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

这异步版本的 TcpListener 为了能使用 listener.incoming,实现了Stream trait,这个改动有两个好处:首先,listener.incoming() 不再阻塞执行器了,执行器能够在没有其他接入的 TCP 连接需要处理时,让给其他还在等在的 future 对象继续执行。

第二个好处是,来自的流的元素能可选地被并发处理,通过流的 for_each_concurrent 方法。这里,我们会重复利用这个方法来并发处理每一个接入的请求。我们需要引入 futures 库的 Stream trait,所以我们的 Cargo.toml 现在看起来像这样:

+[dependencies]
+futures = "0.3"

 [dependencies.async-std]
 version = "1.6"
 features = ["attributes"]

现在,我们能并发地处理每一个连接了,只要我们把 handle_connection 传递给一个闭包函数。这个闭包函数获取了每一个 TcpStream 的所有权,然后一旦 TcpStream 可用就尽快执行。只要我们的 handle_connection 不阻塞,一个慢请求就不会组织其他请求完成了。

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            handle_connection(tcpstream).await;
        })
        .await;
}

并行地服务请求

我们的例子现在能够提供极大的并发了(通过使用异步代码),作为并行(使用线程)的替代方案。然而,异步代码和线程不是二者只得其一。在我们 的例子中, for_each_concurrent 并发地处理每一个连接,但不是在同一个线程。 async-std 库也允许我们生成任务到一个分离开的线程。 因为 handle_connection 既是 Send 的 也是非阻塞的,所以使用 async_std::task::spawn 是安全的。现在代码看起来会像这样子:

use async_std::task::spawn;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |stream| async move {
            let stream = stream.unwrap();
            spawn(handle_connection(stream));
        })
        .await;
}

现在我们同时使用了并发和并行,来同时处理多个请求!更详细的请查看 关于多线程执行器的小节

测试 TCP 服务器

现在我们来测试我们的 handle_connection 函数。

首先,我们需要 TcpStream 来支持工作。在一个端对端或者集成测试中,我们可能需要一个真正的 TCP 连接来测试我们的代码。一个做到这样的策略是在 localhost 的端口 0 启动一个监听器。端口 0 并不是一个合法 UNIX 端口,但它可以用于测试。操作系统会帮我们挑一个开放的 TCP 端口。

替代的,这个示例中会给连接处理器写一个单元测试,来检查 正确的响应会返回给对应的输入。为了当我们的单元测试是隔离的以及决定性的,我们会用 mock 来替换 TcpStream

首先,我们要更改 handle_connection 的签名,来使得它更容易测试。handle_connection 其实并不需要 async_std::net::TcpStream,它需要的是任意已经实现了 async_std::io::Read, async_std::io::Writemarker::Unpin。这样修改类型签名允许我们传递一个 mock 来测试。

use std::marker::Unpin;
use async_std::io::{Read, Write};

async fn handle_connection(mut stream: impl Read + Write + Unpin) {

接下来,我们需要将建一个实现了这些 trait 的 mock TcpStream。首先,我们先实现 Read trait,只需要一个方法 poll_read。我们的 mock TcpStream 会包含一些需要拷贝到读取缓存的数据,然后我们返回 Poll::Ready 来表示读取已经完成。

    use super::*;
    use futures::io::Error;
    use futures::task::{Context, Poll};

    use std::cmp::min;
    use std::pin::Pin;

    struct MockTcpStream {
        read_data: Vec<u8>,
        write_data: Vec<u8>,
    }

    impl Read for MockTcpStream {
        fn poll_read(
            self: Pin<&mut Self>,
            _: &mut Context,
            buf: &mut [u8],
        ) -> Poll<Result<usize, Error>> {
            let size: usize = min(self.read_data.len(), buf.len());
            buf[..size].copy_from_slice(&self.read_data[..size]);
            Poll::Ready(Ok(size))
        }
    }

我们 Write trait 的实现非常简单,尽管我们需要写三个方法: poll_write, poll_flush, 和 poll_closepoll_write 会拷贝任何输入数据到 mock TcpStream,然后回在完成时返回 Poll::Ready。没有工作需要 flush 或者 close 这个 mock TcpStream, 所以 poll_flushpoll_close 可以直接返回 Poll::Ready

    impl Write for MockTcpStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            _: &mut Context,
            buf: &[u8],
        ) -> Poll<Result<usize, Error>> {
            self.write_data = Vec::from(buf);

            Poll::Ready(Ok(buf.len()))
        }

        fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }

        fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }
    }

最后,我们的 mock 还需要实现 Unpin,标记它所在的内存位置可以安全地转移。关于固定和 Unpin 的更多信息,请查看 关于固定的章节

    use std::marker::Unpin;
    impl Unpin for MockTcpStream {}

现在我们准备好测试这个 handle_connection 函数了。设置好包含初始数据的 MockTcpStream 之后,我们能够通过属性注解 #[async_std::test] 执行 handle_connection,这很类似我们怎么使用 #[async_std::main]。为了保证 handle_connection 正常工作,我们要根据 MockTcpStream 的初始内容来检查正确的数据已经写入。

    use std::fs;

    #[async_std::test]
    async fn test_handle_connection() {
        let input_bytes = b"GET / HTTP/1.1\r\n";
        let mut contents = vec![0u8; 1024];
        contents[..input_bytes.len()].clone_from_slice(input_bytes);
        let mut stream = MockTcpStream {
            read_data: contents,
            write_data: Vec::new(),
        };

        handle_connection(&mut stream).await;
        let mut buf = [0u8; 1024];
        stream.read(&mut buf).await.unwrap();

        let expected_contents = fs::read_to_string("hello.html").unwrap();
        let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
        assert!(stream.write_data.starts_with(expected_response.as_bytes()));
    }