迭代与并发
与同步Iterator
相似,有不少不同的迭代处理Stream
中值的方法。有像map
,filter
和fold
这种组合子风格(combinator-style)的方法,还有它们提供了出错时提早退出的(early-exit-on-error)的近亲try_map
,try_filter
和try_fold
。
不幸的是,不能对Stream
使用for
循环,但是对于命令式风格(imperative-style)的代码,是可以使用while let
和next
/try_next
这样的函数的:
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
但要是一次只处理一个元素,就失去了并发的机会,毕竟编写异步代码是第一位的。为了并发处理Stream中的多个内容,请使用for_each_concurrent
和try_for_each_concurrent
:
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}