select!

futures::select宏让用户可同时运行多个future,并在在任意future完成时做出响应。


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

上面的函数会并发运行t1t2。只要其中一个完成了,对应的处理程序(corresponding handler)就会调用println!,并在不执行剩余task的情况下直接结束。

select的基本语法是<pattern> = <expression> => <code>,,对每一个要select的future重复即可。

default => ...complete => ...

select同样支持defaultcomplete分支。

default分支会在每个被select的future都还没完成时执行。拥有default分支的select会因此总是立即返回,因为default会在任一future都未就绪时执行。

complete分支可用来处理所有被select的future均完成且不需再运行的情况,经常是在select!中循环时被用到。


#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

UnpinFusedFuture的交互使用

在上面的第一个例子中你可能注意到,我们不得不对这两个async fn返回的future调用.fuse(),并使用pin_mut将其固定(pin)。之所以要这么做,是因为在select中使用的future必须实现UnpinFusedFuture两个trait。

由于select接受的future参数并非是其值,而是其可变引用,因此需要Unpin。而也正是因为没有获取future的所有权,在调用select后未完成的future仍可以被重用。

类似地,若要让select不在某个future已完成后,再对其进行轮询,该future就需要FusedFuturetrait。通过实现FusedFuture,可以追踪future是否完成,因而可以在循环中使用loop,且只要对尚未完成的future进行轮询就够了。通过上面的例子就能看出,a_futb_fut在第二次循环时应已完成。由于future::ready返回的future实现了FusedFuture,就能够告知select不要再次对其进行轮询。

需要注意的是,Stream也具有相应的FusedStreamtrait。实现了FusedStream或经.fuse()封装的Stream会通过.next()/.try_next()组合子产生具有FusedFuturetrait的future。


#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

使用FuseFuturesUnorderedselect循环中运行并发Task

有个存在感稍弱但是很好用的函数是Fuse::terminated(),通过它可以构造一个已经终止(terminated)的空future,且之后可在其中填充我们要运行的内容。如果一个task要在select循环中运行,而且在这个循环中才被创建,这个函数就有用了。

还有一个就是.select_next_some()函数。它可以与select共同使用,当Stream中返回的值为Some(_)时才运行该分支,而忽略None值。


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

若同一future的多个拷贝需要同时运行,请使用FuturesUnordered类型。下面的例子和上面的差不多,但是会将run_on_new_num_fut的每个拷贝都运行至完成,而非在一份新的拷贝创建时就放弃执行。它会输出run_on_new_num_fut的返回的一个值。


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}