迭代与并发

与同步的 Iterator 类似,有很多不同的方法可以迭代处理 Stream 中的值。有很多组合子风格的方法, 如 mapfilterfold,以及它们的“遇错即断”版本 try_maptry_filtertry_fold

不幸的是,for 循环不能用在 Stream 上,但是对于命令式编程风格(imperative style)的代码, while let 以及 next/try_next 函数还可以使用:


#![allow(unused)]
fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // 对于 `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // 对于 `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
}

然而,如果我们每次只处理一个元素,我们就会失去并发的机会,而这又是我们编写异步代码的首要目的。 为了并发处理一个 Stream 的多个值,使用 for_each_concurrenttry_for_each_concurrent 方法:


#![allow(unused)]
fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // 对于 `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
}