Waker唤醒任务

future第一次轮询时没有执行完这事很常见。此时,future需要保证会被再次轮询以进展(make progress),而这由Waker类型负责。

每次future被轮询时, 它是作为一个“任务”的一部分轮询的。任务(Task)是能提交到执行器上 的顶层future。

Waker提供wake()方法来告诉执行器哪个关联任务应该要唤醒。当wake()函数被调用时, 执行器知道Waker关联的任务已经准备好继续了,并且任务的future会被轮询一遍。

Waker类型还实现了clone(),因此可以到处拷贝储存。

我们来试试用Waker实现一个简单的计时器future吧。

应用:构建计时器

这个例子的目标是: 在创建计时器时创建新线程,休眠特定时间,然后过了时间窗口时通知(signal) 计时器future。

这是我们开始时需要的导入:


# #![allow(unused_variables)]
#fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
#}

我们开始定义future类型吧。 我们的future需要一个方法,让线程知道计时器倒数完了,future 应该要完成了。我们准备用Arc<Mutex<..>>共享值来为沟通线程和future。


# #![allow(unused_variables)]
#fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// 在 `future` 线程和 `awiting` 线程之间共享状态
struct SharedState {
    /// 是否已经达到休眠时间.
    completed: bool,

    /// `TimerFuture` 表示正在运行的 `waker`.
    /// 线程可以在设置完 `completed = true` 之后来通知 `TimerFuture` 任务被唤醒并
    /// 检查 `completed = true`,然后继续执行.
    waker: Option<Waker>,
}
#}

现在,我们来实现Future吧!


# #![allow(unused_variables)]
#fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 检查共享状态,检查定时器是否已经完成.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置 `waker`, 让线程可以在定时器完成时唤醒当前 `waker`,确保
            // 再次轮询 `future` 并获知 `completed = true`.
            //
            // 这样做是非常不错的,而不用每次都重复 `clone` `waker`. 然而, 这个 `TimerFuture`
            // 可以在执行器之间移动, 这可能会导致旧的 `waker` 指向错误的 `waker`, 这会阻止 
            // `TimerFuture` 被正确得唤醒.
            //
            // 注意:可以使用 `Waker::will_wake` 函数来做检查, 但是
            // 为了简单起见,我们忽略了他.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
#}

很简单,对吧?如果线程已经设置成shared_state.completed = true,我们就搞定了!否则, 我们从当前任务克隆Waker并把它传到shared_state.waker,这样线程就能回头再唤醒这个任务。

重要的是,每次future轮询后,我们必须更新Waker,这是因为这个future可能会移动到不同的 任务去,带着不同的Waker。这会在future轮询后在不同任务间移动时发生。

最后,我们需要API来构造计时器并启动线程:


# #![allow(unused_variables)]
#fn main() {
impl TimerFuture {
    /// 创建一个新的 `TimerFuture`,它将在提供的超时之后完成.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // 创建一个新的线程.
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 设置状态,表示定时器已经完成,并唤醒轮询 `future` 中的最后一个
            // 任务 (如果存在的话).
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
#}

哇!这些就是我们构建一个简单计时器future所需的内容了。现在,只要一个执行器(Executor) 执行这个future...