应用:构建执行器

Rust的Future是惰性的:它们不会干任何事,除非它们被驱动执行。一个驱动future类型的 方法是在async函数中使用.await调用,但这只是将问题抛到上一层:谁来跑在顶层async 函数返回的future实例呢?为此,我们需要执行Future的执行器。

Future执行器会拿一组顶层Future去跑poll方法,无论这些Future能否进展。通常, 执行器会poll一个future实例来启动。当Future通过调用wake()方法来指示他们准备好继续 进展,执行器就会把它们放入队列并再一次poll,重复这一过程直到Future完成。

在这一小节,我们要写一个我们的简单执行器,能够并发地运行大量的顶层future实例。

为此,我们需要依赖futures库的ArcWake特质,这个特质提供了构造Waker的简易方法。

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "=0.3.0-alpha.17"

然后,我们在src/main.rs中引入以下:


# #![allow(unused_variables)]
#fn main() {
use {
    futures::{
        future::{FutureExt, BoxFuture},
        task::{ArcWake, waker_ref},
    },
    std::{
        future::Future,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{Context, Poll},
        time::Duration,
    },
    // 我们在上一章中写过的定时器:
    timer_future::TimerFuture,
};
#}

我们的执行器通过给通道(channel)发送任务来工作。执行器会从通道中拉取事件并执行它们。当 一个任务准备好进一步工作(被唤醒了)时,它会被放到channel的末尾,来让自己再次被调度。

在设计时,执行器自身只需要任务通道的接收端。用户会拿到发送端,那样它们就可以开辟(spawn) 新的future实例。任务自身仅仅是能够重新调度自身的future, 所以我们要把它们作为和发送端 配对的future存储。这个发送端能够让任务重新排队。


# #![allow(unused_variables)]
#fn main() {
/// 从管道中接收任务并运行它们的执行程序.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` 在任务管道中创建新的 `future`.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// 一个可以重新安排自己被 `Executor` 调用的 `future`.
struct Task {
    /// 正在运行的 `future` 应该被推进到运行完成.
    ///
    /// 这个 `Mutex` 不是必要的, 因为我们一次只有一个线程
    /// 执行任务,但是,Rust不够聪明,没有办法知道 `future`
    /// 只会在一个线程中发生变化,所以我们需要 `Mutex` 来
    /// 让Rust知道我们保证了跨线程之间的安全性.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// 将任务放回到任务队列.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 允许通知在管道中排队的最大任务数.
    // 这只是为了让 `sync_channel` 满足, 并不会出现在真正的执行器中.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender})
}
#}

我们来加一个方法,让开辟器(spawner)更容易开辟新future吧。这个方法会获取一个future类型, 把它装箱并把它变成一个FutureObj对象,然后把这对象放到新的Arc<Task>里面。这个Arc<Task> 能够放到执行器的队列中。


# #![allow(unused_variables)]
#fn main() {
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}
#}

为了轮询future,我们需要创建Waker。正如在任务唤醒小节中讨论到,Waker负责调度任务 在wake函数调用时再次轮询。记住,Waker告诉执行器具体哪个任务已经准备好了,这使得它们 可以只轮询已经准备好的future。创建Waker的最简单方法是实现ArcWake特质,然后使用 waker_ref或者.into_waker()函数来把Arc<impl ArcWake>转变成Waker。我们来给 我们的任务实现ArcWake,以便它们可以变成Waker并且被唤醒:


# #![allow(unused_variables)]
#fn main() {
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 通过将这个任务发送回任务管道来实现 `wake`,
        // 以便让执行器再次轮询它.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}
#}

WakerArc<Task>创建了之后,调用wake()函数会拷贝一份Arc,发送到任务的通道去。 我们的执行器就会拿到这个任务并轮询它。我们来实现这个吧:


# #![allow(unused_variables)]
#fn main() {
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // 以 `future` 为例子,如果它还没有完成,就轮询并试图完成它.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 从任务自身创建一个 `LocalWaker`.
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` 是 `Pin<Box<dyn Future<Output = T> + Send + 'static>>` 的类型别名.
                // 我们可以调用 `Pin::as_mut` 方法获得 `Pin<&mut dyn Future + Send + 'static>`.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // 我们还没有完成对 `future` 的处理,所以把它再次
                    // 放回它的任务中,以便在某个时段再次运行.
                    *future_slot = Some(future);
                }
            }
        }
    }
}
#}

恭喜!我们现在有一个能干活的future执行器了。我们甚至能用它来运行async/.await代码和定制 的future,例如我们前面写的TimeFuture

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // 在定时器之前和之后创建一个要输出的任务.
    spawner.spawn(async {
        println!("howdy!");
        // 定时器在两秒钟之后完成.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // 释放这个 `spawner`,以便让我们的执行程序知道它已经工作
    // 完成,并且不会接收到更多要运行的任务传入.
    drop(spawner);

    // 运行执行器,直到任务队列为空.
    // 这将输出 "howdy!", 等待一会, 然后输出 "done!".
    executor.run();
}