起步

欢迎使用Rust异步编程!如果你想开始编写Rust异步代码,那你找对地方了。无论你在构建Web服务器, 数据库还是操作系统,本书都会教你如何使用Rust的异步编程工具来榨干硬件性能

这本书讲什么?

本书旨在为萌新和老鸟提供全面,最新的指南,知道如何使用Rust的异步语言特性和代码库:

  • 最初几章介绍异步编程概念,和Rust如何实现这些概念。
  • 中间章节讨论异步编程时可用的关键套件(utilities)和控制流工具,描述架构库和应用时 最大化性能与可用性的最佳实践。
  • 最后部分涵盖更广泛的异步生态, 并示例如何完成常见任务。

就这样, 我们来探索激动人心的Rust异步编程世界吧!

为什么使用异步?

我们都喜欢Rust让我们能够编写快速且安全的软件的方式,但为什么写异步代码呢?

异步代码允许我们在单个OS线程中并发执行多个任务。在使用典型线程化(threaded)应用时, 如果你想同时下载两个不同的网络报, 那么你会将任务分给两个线程,像这样:


# #![allow(unused_variables)]
#fn main() {
fn get_two_sites() {
    // 生成两个线程来下载网页.
    let thread_one = thread::spawn(|| download("https:://www.foo.com"));
    let thread_two = thread::spawn(|| download("https:://www.bar.com"));

    // 等待两个线程运行下载完成.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}
#}

大多数应用都能很好地运行这份代码——毕竟,线程就是设计成这样用的:一次性运行多个不同任务。 然而,它们也有限制。线程切换过程和线程共享数据时会产生大量开销,甚至空跑线程也会占用珍贵 系统资源。这其中可以通过异步代码设计来减少多余的开销"我们可以用Rust的async/.await语法 重写上面的代码,这样我们就可以一次性运行多任务而不需创建多个线程:


# #![allow(unused_variables)]
#fn main() {
async fn get_two_sites_async() {
    // 创建两个不同的 "futures", 当创建完成之后将异步下载网页.
    let future_one = download_async("https:://www.foo.com");
    let future_two = download_async("https:://www.bar.com");

    // 同时运行两个 "futures" 直到完成.
    join!(future_one, future_two);
}
#}

最后,异步应用和对应线程化实现相比,有潜力快得多并占用更少资源,但这是有代价的。OS天然 支持线程,使用它们不需要特定编程模型——任意函数都能创建线程,并且调用那些使用了线程的函数 通常和调用普通函数一样容易。然而,异步函数需要语言或库的特别支持。在Rust中,async fn 创建一个返回Future类型的函数。为了执行函数体,返回Future实例必须运行至完成状态。

重要是记住:传统线程化应用也能很高效,而且Rust的精细内存足迹和可预测性意味着你不需要用 async,你也可以走很远。异步编程模型带来的好处并不总是能够超过带来的复杂度增加,所以 考虑你的应用是否能够用单线程模型来获得更好表现也是很重要的。

异步Rust编程目前状态

随着时间推移,异步Rust生态历经了一系列变革,所以很难说清楚现在应该要用什么工具,应当投入 到什么库,或者该读什么文档。然而,最近Future特质(trait)已在标准库中稳定, async/await特性(feature)也会在不远的将来稳定。生态系统因此开始迁移至新稳定的API, 这之后就没有现在那么混乱了(译者注:指当前生态同时使用互不兼容的Future 0.1, 0.2, 0.3 版本API的混乱局面)

然而,此时此刻生态仍在快速发展,异步Rust体验依然有点糙,大多数库仍然使用futures库0.1 版本,这意味着库维护开发者需要频繁从futures0.3版本获取compat功能。async/await 语言特性也还很新,一些重要扩展,如特质的async fn语法也还没实现,并且当前编译器错误 信息也很难解析。

也就是说:Rust在前进道路上进展还算顺利,在为异步编程取得最优性能与最优工程学支持的路上; 如果你不介意现在就进行冒险,那来潜入Rust异步编程的世界吧!

async/.await初步

async/.await是Rust内置语法,用于让异步函数编写得像同步代码。async将代码块转化成 实现了Future特质的状态机。使用同步方法调用阻塞函数会阻塞整个线程,但阻塞Future只会 让出(yield)线程控制权,让其他Future继续执行。

你可以使用async fn语法创建异步函数:


# #![allow(unused_variables)]
#fn main() {
async fn do_something() { ... }
#}

async fn函数返回实现了Future的类型。为了执行这个Future,我们需要执行器(executor)

// `block_on` 将阻塞当前的线程,直到 `future` 运行完成.
// 其他执行器提供了更复杂的特性,比如将多个 `future` 安排到同一个线程上面.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // 没有输出.
    block_on(future); // `future` 运行,输出 "hello, world!".
}

async fn函数中, 你可以使用.await来等待其他实现了Future特质的类型完成,例如 另外一个async fn的输出。和block_on不同,.await不会阻塞当前线程,而是异步地等待 future完成,在当前future无法进行下去时,允许其他任务运行。

举个例子,想想有以下三个async fn: learn_song, sing_songdance


# #![allow(unused_variables)]
#fn main() {
async fn learn_song() -> Song { ... }
async fn sing_song(song: Song) { ... }
async fn dance() { ... }
#}

一个“学,唱,跳舞”的方法,就是分别阻塞这些函数:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

然而,这样性能并不是最优——我们一次只能干一件事!显然我们必须在唱歌之前学会它,但是学唱 同时也可以跳舞。为了拽黑暗,我们可以创建两个独立可并发执行的async fn

async fn learn_and_sing() {
    // 要唱歌必须得先学会歌曲.
    // 我们这里使用 `.await` 而不是 `block_on` 来
    // 防止线程阻塞, 这样也可以同时跳舞.
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` 类似 `.await`,但是可以同时等待多个 `future` 执行完成.
    // 如果我们 `learn_and_sing` 这个 `future` 被阻塞, 那么 `dance`
    // 这个 `future` 将接管当前的线程. 如果 `dance` 被阻塞, 那么 `learn_and_sing`
    // 就可以重新开始. 如果这个两个 `future` 都被阻塞, 那么 `async_main`
    // 也将被阻塞并让位给执行程序.
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

这个示例里,唱歌之前必须要学习唱这首歌,但是学习唱歌和唱歌都可以和跳舞同时发生。如果我们 用了block_on(learning_song())而不是learn_and_sing中的learn_song().await, 那么当learn_song在执行时线程将无法做别的事,这也使得无法同时跳舞。但是通过.await 执行learn_song的future,我们就可以在learn_song阻塞时让其他任务来掌控当前线程。 这样就可以做到在单线程并发执行多个future到完成状态。

现在,你已经学会了async/await基础,现在我们来试着写一个例子吧。

应用:建立HTTP服务器

让我们用async/.await来构建一个回显(echo)服务器吧!

首先,运行rustup update nightly,确保我们用的是伟大的Rust的最新拷贝——我们要用最新潮 的特性,所以保持更新很必要。然后,运行cargo +nightly new async-await-echo来创建 新项目,并打开生成的async-await-echo文件夹。

现在给Cargo.toml文件添加依赖:

[dependencies]
# The latest version of the "futures" library, which has lots of utilities
# for writing async code. Enable the "compat" feature to include the
# functions for using futures 0.3 and async/await with the Hyper library,
# which use futures 0.1.
futures-preview = { version = "=0.3.0-alpha.16", features = ["compat"] }

# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
hyper = "0.12.9"

现在我们搞定了依赖,让我们开始写代码。打开src/main.rs并在文件开头启用async_await 特性:


# #![allow(unused_variables)]
#![feature(async_await)]
#fn main() {
#}

这使我们现在就可以使用夜版专属(nightly-only)的async/await语法。这个语法很快就会 稳定的。

此外,我们还要加些导入(import):


# #![allow(unused_variables)]
#fn main() {
use {
    hyper::{
        // `Hyper` 中用于处理 `HTTP` 的类型.
        Body, Client, Request, Response, Server, Uri,

        // 这个函数将一个 `future` 的闭包返回转换为 `Hyper Server` trait.
        // 这是一个从普通的请求到响应的异步函数.
        service::service_fn,

        // 这个函数使用 `Hyper` 运行时运行 `future` 直到完成.
        rt::run,
    },
    futures::{
        // `futures 0.1` 的扩展 `trait`, 加上这个 `.compat()` 方法
        // 这可以使我们在 `futures 0.1` 上使用 `.await`.
        compat::Future01CompatExt,
        // 扩展的 `trait` 提供对 `future` 的额外补充方法.
        // `FutureExt` 增加了适用于所有 `future` 的方法,
        // 而 `TryFutureExt` 则向返回 `Result` 类型的 `future` 添加方法.
        future::{FutureExt, TryFutureExt},
    },
    std::net::SocketAddr,
};
#}

导入之后,我们就能开始拼模板写服务了:

async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    // 总是返回一个包含 `hello, world!` 的成功响应.
    Ok(Response::new(Body::from("hello, world!")))
}

async fn run_server(addr: SocketAddr) {
    println!("Listening on http://{}", addr);

    // 在指定的地址上创建服务器.
    let serve_future = Server::bind(&addr)
        // 使用 `async serve_req` 函数来处理请求.
        // `serve` 接受一个闭包,它将返回一个实现了 `Service` trait的类型.
        // `service_fn` 返回一个实现了 `Service` trait的值,并接受一个从
        // 请求到响应的 `future` 闭包, 要在 `Hyper` 中使用 `serve_req` 函
        // 数,我们必须将它打包好并将其放入兼容性容器中,以便从 `futures 0.3`
        // (由 `async fn`返回的那种) 转换到 `futures 0.1` (由 `Hyper` 使
        // 用的那种 ).
        .serve(|| service_fn(|req| serve_req(req).boxed().compat()));
    
    // 等待服务器完成服务或者因错误退出.  
    // 如果发生错误,将错误输出到 `stderr`.
    if let Err(e) = serve_future.compat().await {
        eprintln!("server error: {}", e);
    }
}

fn main() {
    // 设置 `socket` 地址.
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    // 调用我们的 `run_server` 函数, 它将返回一个 `future`.
    // 和 `async fn` 一样, 要让 `run_server` 执行任何操作,
    // 都需要运行返回的 `future`. 并且我们需要将返回的 
    // `future` 从 `0.3` 转换为 `0.1`.
    let futures_03_future = run_server(addr);
    let futures_01_future = futures_03_future.unit_error().boxed().compat();

    // 最后,我们使用 `Hyper` 提供的 `run` 函数来运行未完成的 `future`.
    run(futures_01_future);
}

如果你此时执行cargo run,你应该能看到"Listening on http://127.0.0.1:3000"打印到 终端上。如果你用浏览器打开这URL,你会看到"Hello, world!"。可喜可贺!你刚用Rust写了 第一个异步Web服务器!

你也可以检查请求,里面包含了很多信息,像请求URI,HTTP版本,报文头和其他元数据。例如, 我们可以输出请求URI,像这样:


# #![allow(unused_variables)]
#fn main() {
println!("Got request at {:?}", req.uri());
#}

你可能注意到我们在处理请求时还没有做任何异步操作——我们立刻返回,所以我们并没有充分利用 async fn函数提供给我们的灵活优势。比起返回静态信息,我们来试着来用Hyper的HTTP客户端 把用户请求代理到另外的网站。

我们从解析我们想要发送请求的URL开始:


# #![allow(unused_variables)]
#fn main() {
let url_str = "http://www.rust-lang.org/en-US/";
let url = url_str.parse::<Uri>().expect("failed to parse URL");
#}

然后我们创建一个hyper::Client,并用它发送Get请求,并返回响应给用户:


# #![allow(unused_variables)]
#fn main() {
let res = Client::new().get(url).compat().await;
// 将请求的结果直接返回给调用者.
println!("request finished-- returning response");
res
#}

Client::get返回一个hyper::client::FutureResponse, 它实现了 Future<Output = Result<Response, Error>> (或者在futures 0.1我们叫Future<Item = Response, Error = Error>)。 当我们.await这个future时, 发送了一个HTTP request, 当前任务挂起了,然后排队等待响应 可用时继续执行。

现在,如果你执行cargo run并打开http://127.0.0.1:3000/foo,你会看到Rust主页,以及 以下命令行输出:

Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response

再次恭喜!你刚刚代理了一个HTTP请求!

揭秘: 执行Future与任务(task)

在这一节,我们会讲解底层结构,理解Future和异步任务是如何调度的。如果你只对如何使用 Future类型1而不关心他们怎么工作的, 你可以直接跳到async/await章节。然而,这章 讨论的内容很有用,可以帮助你理解async/await代码如何工作,也可以帮助你理解这些代码的 运行时属性和性能属性,以及帮助你构建新的异步原语(primitives)。如果你现在决定跳过这章, 你可能想要加个书签以便日后回头再看。

现在, 让我们来聊聊Future特质。

Future特质

Future特质是Rust异步编程中心内容。它是一种异步计算,可以产生值(尽管这个值可以为空, 如())。简化版future特质看起来可能像这样:


# #![allow(unused_variables)]
#fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
#}

Future能通过调用poll的方式推进,这会尽可能地推进future到完成状态。如果future完成了, 那就会返回poll::Ready(result)。如果future尚未完成,则返回poll::Pending,并且安排 wake()函数在Future准备好进一步执行时调用(译者注:注册回调函数)。当wake()调用 时,驱动Future的执行器会再次poll使得Future有所进展。

没有wake()的话,执行器将无从获知一个future是否能有所进展,并且会持续轮询(polling) 所以future。但有了wake()函数,执行器就能知道哪些future已经准备好轮询了。

例如,考虑一下场景:我们准备读取一个套接字(socket),它可能还没有用数据返回。如果它有 数据了,我们可以读取数据并返回poll::Ready(data),但如果没有数据准备好,我们这个future 就会阻塞并且不能继续进行。当没有数据可用时,我们必须注册wake函数,告诉执行器我们的 future准备好进一步操作。一个简单的SocketReadfuture可能像这样:


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // `socket` 有数据的时候将其读取并放置在缓冲区并返回.
            Poll::Ready(self.socket.read_buf())
        } else {
            // `socket` 还没有数据.
            //
            // 当数据来到,将调用 `wake`.
            // 这个 `future` 的调用者将知道何时调用 `poll` 并接收数据.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

Futures的这种模型允许组合多个异步操作而无需立刻分配资源。同时运行多个future或者串行(chaining)future 能够通过零分配(allocation-free)状态机实现,像这种:


# #![allow(unused_variables)]
#fn main() {
/// 一个基本的 `future`,它将同时运行其他两个 `future` 直到完成.
///
/// 并发特性是通过对每个 `future` 的轮询交错调用来实现的,
/// 从而允许每个 `future` 以自己的速度前进.
pub struct Join<FutureA, FutureB> {
    // 每个字段可能包含应该运行完成的 `future`.
    // 如果 `future` 运行完成,则将该字段设置为 `None`.
    // 这可以防止我们在运行完成之后再次对 `future` 轮询,
    // 这将不符合 `future` trait 的规范.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // 尝试运行完成这个 future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // 尝试运行完成这个 future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // 两个 `future` 都已经完成,我们可以返回成功回调.
            Poll::Ready(())
        } else {
            // 一个或者两个 `future` 都返回了 `Poll::Pending`,说明仍需要做其他工作.
            // 当有新的进度时,他们将调用 `wake()`.
            Poll::Pending
        }
    }
}
#}

上面代码展示了多个future如何同时执行而无需分别分配资源,这允许异步代码变得更高级。 类似,多个future可以一个接一个执行,像这样:


# #![allow(unused_variables)]
#fn main() {
/// 这是一个 `SimpleFuture`,依次运行直到两个 `future` 都完成.
//
// 提示: 这只是一个简单的示例, `AndThenFut` 是假设两个 `future` 在创建的时候都可用. 
// 真正的 `AndThen` 允许基于第一个 `future` 输出并创建第二个 `future`, 比
// 如 `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // 我们已经完成了第一个 `future`
                // 移除它,开始第二个.
                Poll::Ready(()) => self.first.take(),
                // 我们没有完成第一个 `future`.
                Poll::Pending => return Poll::Pending,
            };
        }
        // 现在,第一个 `future` 已经完成,
        // 那么就尝试完成第二个.
        self.second.poll(wake)
    }
}
#}

这个例子展示future特质如何表达异步控制流而无需请求多个已分配对象或深嵌套回调, 有了基本控制流后,我们来讨论真正的Future特质以及它和示例有什么区别:


# #![allow(unused_variables)]
#fn main() {
trait Future {
    type Output;
    fn poll(
        // 注意这个 `&mut self` 到 `Pin<&mut Self>` 的变化:
        self: Pin<&mut Self>,
        // 以及从 `wake: fn()` 到 `cx: &mut Context<'_>` 的变化:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
#}

我们首先注意到self参数类型不再是mut self而是Pin<&mut Self,。我们会在后面章节 更多地讨论固定(pinning)的问题,但先在我们只需要知道它能让我们创建不可移动的future类型。 不可移动对象(简称不动对象)能够储存指向另一字段(field)和指针,例如: struct MyFut { a: i32, ptr_to_a: *const i32 }。固定时于启动async/await是必需的。

然后wake: fn()变成了&mut Context<'_>。在SimpleFuture里,我们调用函数指针(fn()) 来告诉执行器返件future应该应该要轮询。然而,因为fn()是零大小的(zero-nzed),它不能 储存任何信息说明哪个Future调用了wake

在现实场景中,像Web服务器这样复杂的应用可能有上千不同的连接,带有应该相互隔离来管理的 唤醒器(wakeups)。Context类型通过提供对waker类型的访问来解决这个问题,这些waker 会唤起持定任务。

Waker唤醒任务

future第一次轮询时没有执行完这事很常见。此时,future需要保证会被再次轮询以进展(make progress),而这由Waker类型负责。

每次future被轮询时, 它是作为一个“任务”的一部分轮询的。任务(Task)是能提交到执行器上 的顶层future。

Waker提供wake()方法来告诉执行器哪个关联任务应该要唤醒。当wake()函数被调用时, 执行器知道Waker关联的任务已经准备好继续了,并且任务的future会被轮询一遍。

Waker类型还实现了clone(),因此可以到处拷贝储存。

我们来试试用Waker实现一个简单的计时器future吧。

应用:构建计时器

这个例子的目标是: 在创建计时器时创建新线程,休眠特定时间,然后过了时间窗口时通知(signal) 计时器future。

这是我们开始时需要的导入:


# #![allow(unused_variables)]
#fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
#}

我们开始定义future类型吧。 我们的future需要一个方法,让线程知道计时器倒数完了,future 应该要完成了。我们准备用Arc<Mutex<..>>共享值来为沟通线程和future。


# #![allow(unused_variables)]
#fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// 在 `future` 线程和 `awiting` 线程之间共享状态
struct SharedState {
    /// 是否已经达到休眠时间.
    completed: bool,

    /// `TimerFuture` 表示正在运行的 `waker`.
    /// 线程可以在设置完 `completed = true` 之后来通知 `TimerFuture` 任务被唤醒并
    /// 检查 `completed = true`,然后继续执行.
    waker: Option<Waker>,
}
#}

现在,我们来实现Future吧!


# #![allow(unused_variables)]
#fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 检查共享状态,检查定时器是否已经完成.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置 `waker`, 让线程可以在定时器完成时唤醒当前 `waker`,确保
            // 再次轮询 `future` 并获知 `completed = true`.
            //
            // 这样做是非常不错的,而不用每次都重复 `clone` `waker`. 然而, 这个 `TimerFuture`
            // 可以在执行器之间移动, 这可能会导致旧的 `waker` 指向错误的 `waker`, 这会阻止 
            // `TimerFuture` 被正确得唤醒.
            //
            // 注意:可以使用 `Waker::will_wake` 函数来做检查, 但是
            // 为了简单起见,我们忽略了他.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
#}

很简单,对吧?如果线程已经设置成shared_state.completed = true,我们就搞定了!否则, 我们从当前任务克隆Waker并把它传到shared_state.waker,这样线程就能回头再唤醒这个任务。

重要的是,每次future轮询后,我们必须更新Waker,这是因为这个future可能会移动到不同的 任务去,带着不同的Waker。这会在future轮询后在不同任务间移动时发生。

最后,我们需要API来构造计时器并启动线程:


# #![allow(unused_variables)]
#fn main() {
impl TimerFuture {
    /// 创建一个新的 `TimerFuture`,它将在提供的超时之后完成.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // 创建一个新的线程.
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 设置状态,表示定时器已经完成,并唤醒轮询 `future` 中的最后一个
            // 任务 (如果存在的话).
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
#}

哇!这些就是我们构建一个简单计时器future所需的内容了。现在,只要一个执行器(Executor) 执行这个future...

应用:构建执行器

Rust的Future是惰性的:它们不会干任何事,除非它们被驱动执行。一个驱动future类型的 方法是在async函数中使用.await调用,但这只是将问题抛到上一层:谁来跑在顶层async 函数返回的future实例呢?为此,我们需要执行Future的执行器。

Future执行器会拿一组顶层Future去跑poll方法,无论这些Future能否进展。通常, 执行器会poll一个future实例来启动。当Future通过调用wake()方法来指示他们准备好继续 进展,执行器就会把它们放入队列并再一次poll,重复这一过程直到Future完成。

在这一小节,我们要写一个我们的简单执行器,能够并发地运行大量的顶层future实例。

为此,我们需要依赖futures库的ArcWake特质,这个特质提供了构造Waker的简易方法。

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "=0.3.0-alpha.17"

然后,我们在src/main.rs中引入以下:


# #![allow(unused_variables)]
#fn main() {
use {
    futures::{
        future::{FutureExt, BoxFuture},
        task::{ArcWake, waker_ref},
    },
    std::{
        future::Future,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{Context, Poll},
        time::Duration,
    },
    // 我们在上一章中写过的定时器:
    timer_future::TimerFuture,
};
#}

我们的执行器通过给通道(channel)发送任务来工作。执行器会从通道中拉取事件并执行它们。当 一个任务准备好进一步工作(被唤醒了)时,它会被放到channel的末尾,来让自己再次被调度。

在设计时,执行器自身只需要任务通道的接收端。用户会拿到发送端,那样它们就可以开辟(spawn) 新的future实例。任务自身仅仅是能够重新调度自身的future, 所以我们要把它们作为和发送端 配对的future存储。这个发送端能够让任务重新排队。


# #![allow(unused_variables)]
#fn main() {
/// 从管道中接收任务并运行它们的执行程序.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` 在任务管道中创建新的 `future`.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// 一个可以重新安排自己被 `Executor` 调用的 `future`.
struct Task {
    /// 正在运行的 `future` 应该被推进到运行完成.
    ///
    /// 这个 `Mutex` 不是必要的, 因为我们一次只有一个线程
    /// 执行任务,但是,Rust不够聪明,没有办法知道 `future`
    /// 只会在一个线程中发生变化,所以我们需要 `Mutex` 来
    /// 让Rust知道我们保证了跨线程之间的安全性.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// 将任务放回到任务队列.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 允许通知在管道中排队的最大任务数.
    // 这只是为了让 `sync_channel` 满足, 并不会出现在真正的执行器中.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender})
}
#}

我们来加一个方法,让开辟器(spawner)更容易开辟新future吧。这个方法会获取一个future类型, 把它装箱并把它变成一个FutureObj对象,然后把这对象放到新的Arc<Task>里面。这个Arc<Task> 能够放到执行器的队列中。


# #![allow(unused_variables)]
#fn main() {
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}
#}

为了轮询future,我们需要创建Waker。正如在任务唤醒小节中讨论到,Waker负责调度任务 在wake函数调用时再次轮询。记住,Waker告诉执行器具体哪个任务已经准备好了,这使得它们 可以只轮询已经准备好的future。创建Waker的最简单方法是实现ArcWake特质,然后使用 waker_ref或者.into_waker()函数来把Arc<impl ArcWake>转变成Waker。我们来给 我们的任务实现ArcWake,以便它们可以变成Waker并且被唤醒:


# #![allow(unused_variables)]
#fn main() {
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 通过将这个任务发送回任务管道来实现 `wake`,
        // 以便让执行器再次轮询它.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}
#}

WakerArc<Task>创建了之后,调用wake()函数会拷贝一份Arc,发送到任务的通道去。 我们的执行器就会拿到这个任务并轮询它。我们来实现这个吧:


# #![allow(unused_variables)]
#fn main() {
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // 以 `future` 为例子,如果它还没有完成,就轮询并试图完成它.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 从任务自身创建一个 `LocalWaker`.
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` 是 `Pin<Box<dyn Future<Output = T> + Send + 'static>>` 的类型别名.
                // 我们可以调用 `Pin::as_mut` 方法获得 `Pin<&mut dyn Future + Send + 'static>`.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // 我们还没有完成对 `future` 的处理,所以把它再次
                    // 放回它的任务中,以便在某个时段再次运行.
                    *future_slot = Some(future);
                }
            }
        }
    }
}
#}

恭喜!我们现在有一个能干活的future执行器了。我们甚至能用它来运行async/.await代码和定制 的future,例如我们前面写的TimeFuture

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // 在定时器之前和之后创建一个要输出的任务.
    spawner.spawn(async {
        println!("howdy!");
        // 定时器在两秒钟之后完成.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // 释放这个 `spawner`,以便让我们的执行程序知道它已经工作
    // 完成,并且不会接收到更多要运行的任务传入.
    drop(spawner);

    // 运行执行器,直到任务队列为空.
    // 这将输出 "howdy!", 等待一会, 然后输出 "done!".
    executor.run();
}

执行器与系统IO

在前面Future特质小节,我们讨论了一个对socket进行异步读取的future例子:


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // `socket` 有数据的时候将其读取并放置在缓冲区并返回.
            Poll::Ready(self.socket.read_buf())
        } else {
            // `socket` 还没有数据.
            //
            // 当数据来到,将调用 `wake`.
            // 这个 `future` 的调用者将知道何时调用 `poll` 并接收数据.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

这个future会读取socket上的可用数据,如果没有数据可用,那它会让出执行器,请求在socket 再次可读时唤醒它的任务。然而,从这个例子里我们不能很清楚知道Socket类型是如何实现的, 尤其是不了解set_readable_callback函数如何工作。我们要怎样安排lw.wake()方法在一旦 socket变得可用时调用一次呢?一个方案就是让一个线程持续不断地检查socket是否可读,然后 可读时马上调用wake()。然而,这样子效率太低了,那对于每一个阻塞IO的future我们都需要 独立的线程。这会大大降低我们异步代码的效率。

实践时,这个问题是通过整合IO感知系统阻塞元件(IO-aware system blocking primitive), 像Linux上的epoll, FreeBSD和Mac OS的kqueue, Windows的IOCP,以及Fuchsia的poart (以上这些都通过了跨平台Rust库mio暴露出来)。这个元件全都允许线程阻塞多个异步IO事件, 一旦这些事件中有一个完成了,元件就会返回。实际上,这些API通常看着像这样:


# #![allow(unused_variables)]
#fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { ... }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { ... }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);
#}

future执行器能够用这些元件来提供异步IO对象,例如可配置回调在特定IO时间出现时执行的socket。 像上面我们的SocketRead例子,Socket::set_readable_callback函数可能看起来像以下伪代码:


# #![allow(unused_variables)]
#fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
#}

我们现在只需要一个执行器线程来接收并分发任何IO事件给特定Waker,这些Waker会唤醒相应 任务,允许执行器在返回来检查更多IO事件之前,驱动更多任务完成。

async/.await

第一章,我们简单介绍了async/.await,并且用它构建一个简单的服务器。这一章会详细 讨论async/.await,解释它如何工作以及async代码如何和传统Rust程序不同。

async/.await是特殊的Rust语法,使得让出当前线程控制权成为可能,而不是阻塞它,也允许 其他代码在等待一个车操作完成时取得进展。

有两种主要的方法使用async: async fnasync块。两种方法都返回一个实现了Future 特质的值:


# #![allow(unused_variables)]

#fn main() {
// `foo()` 返回一个实现了 `Future<Output = u8>` 的类型.
// `foo().await` 将返回类型为 `u8` 的值.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // 这个 `async` 区域返回一个实现了 `Future<Output = u8>` 的类型.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}
#}

就像我们在第一章中看到,async体以及其他future类型是惰性的:除非它们运行起来,否则它们 什么都不做。运行Future最常见的方法是.await它。当.awaitFuture上调用时,它会 尝试把future跑到完成状态。如果Future被阻塞了,它会让出当前线程的控制权。能取得进展时, 执行器就会捡起这个Future并继续执行,让.await求解。

async生命周期

和传统函数不同,async fn会获取引用以及其他'static生命周期参数,并返回被这些参数的 生命周期约束的Future


# #![allow(unused_variables)]
#fn main() {
// 这是一个 `async` 函数:
async fn foo(x: &u8) -> u8 { *x }

// 相当于这个普通函数:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}
#}

这意味着这些future被async fn函数返回后必须要在它的非'static参数仍然有效时.await。 在通常的场景中,future在函数调用后马上.await(例如foo(&x).await),并不会有 大问题。然而,如果储存了这些future或者把它发送到其他的任务或者线程,那就有问题了。

一个常用的规避方法以把带有引用参数的async fn转化成一个'staticfuture是把这些参数 和应用的async fn函数调用封装到async块中:


# #![allow(unused_variables)]
#fn main() {
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}
#}

通过移动参数到async块中,我们把它的生命周期扩展到了匹配调用foo函数返回的Future的 生命周期。

async move

async块和闭包允许使用move关键字,这和普通的闭包一样。一个async move块会获取 所指向变量的所有群,允许它超长存活(outlive)当前作用域,但是放弃了与其他代码共享这些 变量的能力:


# #![allow(unused_variables)]
#fn main() {
/// `async` 区域:
///
/// 多个 `async` 区域可以访问相同的本地变量,
/// 只要它们在变量的作用域内执行.
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{}", my_string);
    };

    let future_two = async {
        // ...
        println!("{}", my_string);
    };

    // 运行两个 `future`,输出两次 "foo":
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` 区域:
///
/// 只有一个 `async move` 区域可以访问同一个被捕获的变量, 
/// 因为被捕获的变量已经移动到 `async move` 生成的 `future` 中:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{}", my_string);
    }
}
#}

在多线程执行器中.await

提醒一下,在使用多线程的Future执行器时,一个Future可能在线程间移动,所以任何在 async体重使用的变量必须能够穿过线程,所以任何.await都有可能导致切换到新线程。

这意味着使用Rc&RefCell或者其他没有实现Send特质的类型是不安全的,包括那些指向 没有Sync特质类型的引用。

(告示:使用这些类型是允许的,只要他们不是在调用.await的作用域内。)

类似的,横跨.await持有一个非future感知的锁这种做法是很不好的,因为它能导致整个线程池 锁上:一个任务可能获得了所,.await然后让出到执行器,允许其他任务尝试获取所并导致死锁。 为了避免这种情况,使用futures::lock里的Mutex类型比起std::sync更好。

固定(Pinning)

为了轮询future,future首先要用特殊类型Pin<T>来固定。如果你读了前面《执行Future与 任务小节中关于Future退出的解释,你会从Future::poll方法的定义中认出Pin。但这意味 什么?我们为什么需要它?

为什么需要固定

固定保证对象永不移动。为了理解这为什么必须,我们回忆一下async/.await怎么工作吧。 考虑以下代码:


# #![allow(unused_variables)]
#fn main() {
let fut_one = ...;
let fut_two = ...;
async move {
    fut_one.await;
    fut_two.await;
}
#}

这段代码实际上创建了一个实现了Future特质的匿名类型,提供了poll方法,如下:


# #![allow(unused_variables)]
#fn main() {
// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// List of states our `async` block can be in
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}
#}

poll第一次调用时,它会轮询fut_one。如果fut_one不能完成,那么 AsyncFuture::poll就会返回。调用poll的Future会从上次中断的地方继续。这个过程会持续 到future成功完成。

然而,如果我们在async块中用了引用呢?例如:


# #![allow(unused_variables)]
#fn main() {
async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}
#}

这会编译成什么结构呢?


# #![allow(unused_variables)]
#fn main() {
struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}
#}

这里,ReadIhtoBuffuture持有了一个指向其他字段x的引用。然而,如果AsyncFuture被 移动了,x的位置(location)也会被移走,使得存储在read_into_buf_fut.buf的指针失效。

固定future到内存特定位置则阻止了这种问题,让创建指向async块的引用变得安全。

如何固定?

Pin类型包装了指针类型,保证了指针指向的值不会被移走。例如,Pin<&mut T>Pin<&T>Pin<Box<T>>全都保证了T不会被移走。

多数类型被移走也不会有问题。这些类型实现了Unpin特质。指向Unpin类型的指针能够自由地 放进Pin,或取走。例如,u8Unpin的,所以Pin<&mut T>的行为就像普通的&mut T

一些函数需要他们协作的future是Unpin的。为了让这些函数使用不是UnpinFutureStream,你首先需要这个值固定,要么用Box::pin(创建Pin<Box<T>>)要么使用 pin_utils::pin_mut!(创建Pin<&mut T>)。Pin<Box<Fut>>Pin<&mut Fut>都能 用作future,并且都实现了Unpin

例如:


# #![allow(unused_variables)]
#fn main() {
use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io

// A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { ... }

let fut = async { ... };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait

// Pinning with `Box`:
let fut = async { ... };
let fut = Box::pin(fut);
execute_unpin_future(fut); // OK

// Pinning with `pin_mut!`:
let fut = async { ... };
pin_mut!(fut);
execute_unpin_future(fut); // OK
#}

Steam特质

Stream特质与Future类似,但能在完成前返还(yield)多个值,与标准库中的Iterator 类似:


# #![allow(unused_variables)]
#fn main() {
trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Retuns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}
#}

一个常见的使用Stream的例子是futures库中通道的Receiver。每次Sender端发送一个值 时,它就会返回一个Some(val),并且会在Sender关闭且所有消息都接收后返还None:


# #![allow(unused_variables)]
#fn main() {
async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}
#}

迭代与并发

与同步的Iterator类似,有很多不同的方法可以迭代处理Stream中的值。有很多组合子风格 的方法,如mapfilterfold,以及它们的“遇错即断”版本try_maptry_filtertry_fold

不幸的是,for循环不能用在Stream上,但是对于命令式编程风格(imperative style)的 代码,while let以及next/try_next函数还可以使用:


# #![allow(unused_variables)]
#fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
#}

然而,如果我们每次只处理一个元素,我们就要是去并发的机会,毕竟这是我们编写异步代码的首要 目的。为了并发处理一个Stream的多个值,使用for_each_concurrenttry_for_each_concurrent方法:


# #![allow(unused_variables)]
#fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
#}

同时执行多个Future

直到现在,我们几乎只用.await来执行future,而这会阻塞并发任务,直到特定的Future完成。 然而,真实的异步应用经常需要并发执行几个不同的操作。

这一章,我们会覆盖一些同事执行多个异步操作的方法:

  • join!:等待所有future完成
  • select!:等待其中一个future完成
  • 开辟(Spawning): 创建顶层任务,运行future至完成
  • FuturesUnordered: 一组返还子future的future

join!

futures::join宏等待并发执行的多个不同future完成。

当进行多个异步操作时,可以简单地用.await串行执行:


# #![allow(unused_variables)]
#fn main() {
async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}
#}

然而,这实际上比必要的慢,因为我们不必在get_book完成后再get_music。在其它编程语言 中,future是运行至完成的,所以两个操作可以通过先调起async fn来启动future,然后再分别 await他们来并发操作: However, this will be slower than necessary, since it won't start trying to get_music until after get_book has completed. In some other languages, futures are ambiently run to completion, so two operations can be run concurrently by first calling the each async fn to start the futures, and then awaiting them both:


# #![allow(unused_variables)]
#fn main() {
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}
#}

然而,Rust future不会干任何事情,除非它们已经.await了。这意味着上面这两段代码都会串行 执行book_futuremusic_future而非并发执行。为了正确地并发这两个future,使用 futures::join!


# #![allow(unused_variables)]
#fn main() {
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}
#}

join!返回值是包含每个传入future的输出的元组。

try_join!

对于那些返回Result的future,考虑使用try_join!而非join。因为join只会在所有子 future都完成后才会完成,它甚至会在子future返回Err之后继续处理。

join!不同,try_join!会在其中的字future返回错误后立即完成。


# #![allow(unused_variables)]
#fn main() {
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
#}

注意,传进try_join!的future必须要用相同的错误类型。考虑使用 futures::future::TryFutureExt库的.map_err(|e| ...)err_into()函数来统一 错误类型:


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
#}

select!

futures::select宏同时跑多个future,允许用户在任意future完成时响应:


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
#}

上面的函数会并发跑t1t2。当t1t2结束时,对应的句柄(handler)会调用println!, 然后函数就会结束而不会完成剩下的任务。

select的基本格式为<pattern> = <expression> => <code>,,可以重复你想select的 任意多future。

default => ... and complete => ...

select也支持defaultcomplete分支。

default会在没有被select的future完成时执行,因此,带有default分支的select总是 马上返回,因为default会在没有其它future准备好的时候返回。

complete分支则用来处理所有被select的future都完成并且不需进一步处理的情况。这在循环 select时很好用:


# #![allow(unused_variables)]
#fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
#}

UnpinFusedFuture交互

你会注意到,在上面第一个例子中,我们在两个async fn函数返回的future上调用了.fuse(), 然后用pin_mut来固定他们。这两个调用都是必需的,用在select中的future必须实现UnpinFusedFuture

需要Unpin是因为select是用可变引用访问future的,不获取future的所有权。未完成的future 因此可以在select调用后继续使用。

类似的,需要FusedFuture是因为select一定不能轮询已完成的future。FusedFuture用来 实现追踪(track)future是否已完成。这种使得在循环中使用select成为客官您,只轮询尚未 完成的future。这可以从上面的例子中看出,a_futb_fut可能会在第二次循环的时候已经 完成了。因为future::ready返回的future实现了FusedFuture,所以select可以知道不必 再次轮询它了。

注意,stream也有对应的FusedStream特质。实现了这个特质或者被.fuse()包装的Stream会 从它们的.next/try_next()组合子中返还FusedFutre


# #![allow(unused_variables)]
#fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
#}

带有FuseFuturesUnorderedselect循环中的并发任务

有个不太好找但是很趁手的函数叫Fuse::terminated()。这个函数允许构造已经被终止的空 future,并且能够在之后填进需要运行的future。 这个在一个任务需要select循环中运行但是它本身是在select循环中创建的场景中很好用。

注意.select_next_some()函数的是使用。这可以用在select上,并且只运行从stream返回的 Some(_)值而忽略None


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
#}

当有很多份相同future的拷贝同时执行时,使用FutureUnordered类型。下面的例子和上面的 例子很类似,但会运行run_on_new_num_fut的所有拷贝都到完成状态,而不是当一个新拷贝 创建时就中断他们。它也会打印run_on_new_num_fut的返回值:


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

#}

需要知道并喜爱上的规避方法

Rust的async支持还是相当新的,所以还有一大堆大家都想要的特性还在开发,以及一些不太用到 分析技术。这章我们会讨论一些常见的痛点并解释如何规避它们。

返回类型错误

典型的Rust函数中,返回值类型错误会导致如下错误:

error[E0308]: mismatched types
 --> src/main.rs:2:12
  |
1 | fn foo() {
  |           - expected `()` because of default return type
2 |     return "foo"
  |            ^^^^^ expected (), found reference
  |
  = note: expected type `()`
             found type `&'static str`

然而,当前的async fn支持不知道要“相信”函数签名中写到的返回类型,导致不支配或者不合理 的错误。例如,函数async fn foo() {"foo" }会导致这样的错误:

error[E0271]: type mismatch resolving `<impl std::future::Future as std::future::Future>::Output == ()`
 --> src/lib.rs:1:16
  |
1 | async fn foo() {
  |                ^ expected &str, found ()
  |
  = note: expected type `&str`
             found type `()`
  = note: the return type of a function must have a statically known size

这个错误是说期望&str但是却发现()类型,而实际上应该是要反过来。这是因为编译器错误地 相信函数体返回的才是正确的类型。

绕过这个问题的就是认出错误中指出的,带有”expected SomeType, found OtherType"信息 的函数签名,这统一意味着有一个或者多个返回是错误的。

这个问题的修复在这个bug里跟踪。

Box<dyn Trait>

类似的,因为函数签名中的返回类型没有正确地传递下去,async fn的返回值会不准确地解析为 他们的期望类型。

实践中,这意味着从async fn返回Box<dyn Trait>对象需要手动地从Box<MyType>类型 as声明为Box<dyn Trait>类型。

以下代码会报错:

async fn x() -> Box<dyn std::fmt::Display> {
    Box::new("foo")
}

这个问题能够用手工as声明的方法规避:

async fn x() -> Box<dyn std::fmt::Display> {
    Box::new("foo") as Box<dyn std::fmt::Display>
}

这个问题的修复在这个bug里跟踪。

? in async Blocks

就像在async fn中,在async块中使用?也很常见。然而,async块的返回值类型并没有 显式说明。这会导致编译器无法推断(infer)async块的错误类型。

例如,以下代码:


# #![allow(unused_variables)]
#fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok(())
};
#}

会触发以下错误:

error[E0282]: type annotations needed
 --> src/main.rs:5:9
  |
4 |     let fut = async {
  |         --- consider giving `fut` a type
5 |         foo().await?;
  |         ^^^^^^^^^^^^ cannot infer type

很不行地,目前没有版本来"give fut a type",也没有显式指定async块返回值类型的方法。 要规避这个问题,使用“多宝鱼(turbofish)”操作符来提供async块的成功与错误类型:


# #![allow(unused_variables)]
#fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- note the explicit type annotation here
};
#}

Send模拟

一些async fn状态机是可以安全地跨线程传递(Send)的,但另外的不可以。一个async fnFuture是否Send取决于是否有非Send类型跨越.await点被持有了。当编译器发现 有些值可能会跨.await持有时。编译器尽可能地模拟Send,但是这种分析今天在一些地方过于 保守。

例如,考虑一个简单的非Send类型,可能是一种持有Rc的类型:


# #![allow(unused_variables)]
#fn main() {
use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);
#}

类型NotSend的变量可能会很简单地作为临时变量出现在async fn函数中,甚至会出现在 async fn函数返回的Future类型必须是Send的时候:

async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

然而,如果我们改动foo来存一个NotSend变量,这个例子就不再编译了:


# #![allow(unused_variables)]
#fn main() {
async fn foo() {
    let x = NotSend::default();
    bar().await;
}
#}
error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
  --> src/main.rs:15:5
   |
15 |     require_send(foo());
   |     ^^^^^^^^^^^^ `std::rc::Rc<()>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
   = note: required because it appears within the type `NotSend`
   = note: required because it appears within the type `{NotSend, impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
note: required by `require_send`
  --> src/main.rs:12:1
   |
12 | fn require_send(_: impl Send) {}
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.

这个错误是正确的。如果我们把x存到变量中去,它不会被丢弃(drop),直到.await之后, 这时async fn可能在另外一个线程中运行。因为Rc不是Send的,允许它穿过线程是不合理的。 一个简单的解决方法是应该在.await之前drop掉这个Rc,但是不幸的是现在这种方法还不能 工作。

为了规避这个问题,你可能需要引入一个块作用域来封装任何非Send变量。这会让编译器更容易 发现这些变量不会存活超过.await点。


# #![allow(unused_variables)]
#fn main() {
async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
#}

递归

在内部,async fn创建了一个包含了要.await的子Future的状态机。这样递归的async fn 有点诡异,因为结果的状态机必须包含它自身:


# #![allow(unused_variables)]
#fn main() {
// This function:
async fn foo() {
    step_one().await;
    step_two().await;
}
// generates a type like this:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// So this function:
async fn recursive() {
    recursive().await;
    recursive().await;
}

// generates a type like this:
enum Recursive {
    First(Recursive),
    Second(Recursive),
}
#}

这不会工作——我们创建了大小为无限大的类型! 编译器会抱怨:

error[E0733]: recursion in an `async fn` requires boxing
 --> src/lib.rs:1:22
  |
1 | async fn recursive() {
  |                      ^ an `async fn` cannot invoke itself directly
  |
  = note: a recursive `async fn` must be rewritten to return a boxed future.

为了允许这种做法,我们需要用Box来间接调用。而不幸的是,编译器限制意味着把recursive() 的调用包裹在Box::pin并不够。为了让递归调用工作,我们必须把recursive转换成非async 函数,然后返回一个.boxed()的异步块


# #![allow(unused_variables)]
#fn main() {
use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}
#}

特质中的async

目前,async fn不能在特质中使用。原因一些复杂,但是有计划在未来移除这个限制。

不过,这个问题可以用crates.io的async_trait 来规避。

注意,这些特质方法会导致每个函数调用都需要分配堆内存。这可能对于大部分应用都不是特别严重 的开销,但是在决定是否要把这个功能作为底层函数的公共API,尤其这个函数可能每秒调用上百万次时 则需要多加考虑。