select!
futures::select
宏让用户可同时运行多个future,并在在任意future完成时做出响应。
#![allow(unused)] fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } }
上面的函数会并发运行t1
和t2
。只要其中一个完成了,对应的处理程序(corresponding handler)就会调用println!
,并在不执行剩余task的情况下直接结束。
select
的基本语法是<pattern> = <expression> => <code>,
,对每一个要select
的future重复即可。
default => ...
和 complete => ...
select
同样支持default
和complete
分支。
default
分支会在每个被select
的future都还没完成时执行。拥有default
分支的select
会因此总是立即返回,因为default
会在任一future都未就绪时执行。
complete
分支可用来处理所有被select
的future均完成且不需再运行的情况,经常是在select!
中循环时被用到。
#![allow(unused)] fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } }
与Unpin
和 FusedFuture
的交互使用
在上面的第一个例子中你可能注意到,我们不得不对这两个async fn
返回的future调用.fuse()
,并使用pin_mut
将其固定(pin)。之所以要这么做,是因为在select
中使用的future必须实现Unpin
和FusedFuture
两个trait。
由于select
接受的future参数并非是其值,而是其可变引用,因此需要Unpin
。而也正是因为没有获取future的所有权,在调用select
后未完成的future仍可以被重用。
类似地,若要让select
不在某个future已完成后,再对其进行轮询,该future就需要FusedFuture
trait。通过实现FusedFuture
,可以追踪future是否完成,因而可以在循环中使用loop
,且只要对尚未完成的future进行轮询就够了。通过上面的例子就能看出,a_fut
或b_fut
在第二次循环时应已完成。由于future::ready
返回的future实现了FusedFuture
,就能够告知select
不要再次对其进行轮询。
需要注意的是,Stream
也具有相应的FusedStream
trait。实现了FusedStream
或经.fuse()
封装的Stream会通过.next()
/.try_next()
组合子产生具有FusedFuture
trait的future。
#![allow(unused)] fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } }
使用Fuse
和FuturesUnordered
在select
循环中运行并发Task
有个存在感稍弱但是很好用的函数是Fuse::terminated()
,通过它可以构造一个已经终止(terminated)的空future,且之后可在其中填充我们要运行的内容。如果一个task要在select
循环中运行,而且在这个循环中才被创建,这个函数就有用了。
还有一个就是.select_next_some()
函数。它可以与select
共同使用,当Stream中返回的值为Some(_)
时才运行该分支,而忽略None
值。
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }
若同一future的多个拷贝需要同时运行,请使用FuturesUnordered
类型。下面的例子和上面的差不多,但是会将run_on_new_num_fut
的每个拷贝都运行至完成,而非在一份新的拷贝创建时就放弃执行。它会输出run_on_new_num_fut
的返回的一个值。
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } // Runs `run_on_new_num` with the latest number // retrieved from `get_new_num`. // // `get_new_num` is re-run every time a timer elapses, // immediately cancelling the currently running // `run_on_new_num` and replacing it with the newly // returned value. async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }