并发处理链接
现在我们代码的问题,是 listener.incoming()
是一个阻塞的迭代器。执行器不能在 listener
等待接入连接时运行其他 future,使得我们不能处理一个新连接,直到我们处理完前一个连接。
为了修复这个问题,我们要转化 listener.incoming()
,从阻塞迭代器转换成非阻塞的流。流类似于迭代器,但是会异步地被消耗。更详细的请查看关于流的章节.
我们来把阻塞的 std::net::TcpListener
替换成非阻塞的 async_std::net::TcpListener
,并且将我们的连接处理器更新为接受 async_std::net::TcpStream
:
use async_std::prelude::*;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
//<-- snip -->
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
这异步版本的 TcpListener
为了能使用 listener.incoming
,实现了Stream
trait,这个改动有两个好处:首先,listener.incoming()
不再阻塞执行器了,执行器能够在没有其他接入的 TCP 连接需要处理时,让给其他还在等在的 future 对象继续执行。
第二个好处是,来自的流的元素能可选地被并发处理,通过流的 for_each_concurrent
方法。这里,我们会重复利用这个方法来并发处理每一个接入的请求。我们需要引入 futures
库的 Stream
trait,所以我们的 Cargo.toml 现在看起来像这样:
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
现在,我们能并发地处理每一个连接了,只要我们把 handle_connection
传递给一个闭包函数。这个闭包函数获取了每一个 TcpStream
的所有权,然后一旦 TcpStream
可用就尽快执行。只要我们的 handle_connection
不阻塞,一个慢请求就不会组织其他请求完成了。
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
并行地服务请求
我们的例子现在能够提供极大的并发了(通过使用异步代码),作为并行(使用线程)的替代方案。然而,异步代码和线程不是二者只得其一。在我们 的例子中, for_each_concurrent
并发地处理每一个连接,但不是在同一个线程。 async-std
库也允许我们生成任务到一个分离开的线程。 因为 handle_connection
既是 Send
的 也是非阻塞的,所以使用 async_std::task::spawn
是安全的。现在代码看起来会像这样子:
use async_std::task::spawn; #[async_std::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); listener .incoming() .for_each_concurrent(/* limit */ None, |stream| async move { let stream = stream.unwrap(); spawn(handle_connection(stream)); }) .await; }
现在我们同时使用了并发和并行,来同时处理多个请求!更详细的请查看 关于多线程执行器的小节。