结合使用 future、任务和线程

ch17-06-futures-tasks-threads.md
commit 06d73f3935dfec895aec9790127dc8b6fc827ce1

正如我们在第十六章所见,线程提供了一种并发的方式。在这一章节我们见过了另一种方式:通过 future 和流来使用异步。如果你好奇何时选择一个而不是另一个,答案是:因情况而异!同时在很多场景下,选择不是线程异步而是线程异步。

几十年来很多操作系统已经提供了基于线程的并发模型,因此很多编程语言也支持它们。然而这些模型并非没有取舍。在很多操作系统中,它们为每一个线程使用了不少的内存,同时启动和停止带来了一些开销。线程也只有当你的操作系统和硬件支持它们的时候才是一个选项。不同于主流的台式机和移动电脑,一些嵌入式系统完全没有操作系统,所以它们也没有线程。

异步模型提供了一个不同的 -- 最终也是互补的 -- 权衡取舍。在异步模型中,并发操作并不需求自己的线程。相反,它们运行在任务上,正如流小节中我们用 trpl::spawn_task 从异步函数中开始工作一样。任务类似于线程,不过不同于由操作系统管理,它由库级别的代码管理:也就是运行时。

在上一小节,我们看到可以通过异步信道来构建一个流并产生一个可以在异步代码中调用的异步任务。我们也可以用线程来做到完全相同的事情。在示例 17-40 中使用了 trpl::spawn_tasktrpl::sleep。在示例 17-41 中,我们将 get_intervals 函数中的代码替换为标准库中的 thread::spawnthread::sleep API。

文件名:src/main.rs

extern crate trpl; // required for mdbook test

use std::{pin::pin, thread, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    // This is *not* `trpl::spawn` but `std::thread::spawn`!
    thread::spawn(move || {
        let mut count = 0;
        loop {
            // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
            thread::sleep(Duration::from_millis(1));
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}
示例 17-41:为 `get_intervals` 函数使用 `std::thread` API 而不是异步 `trpl` API

如果你运行这段代码,其输入与示例 17-40 的一样。并且请注意从调用代码的角度来说改变是多么的微小。而且,即便一个函数在运行时上产生一个异步任务而另一个产生一个系统线程,其返回的流不受该区别的影响。

尽管它们是相似的,这两种方式的行为非常不同,尽管在这个非常简单的例子中我们可能很难进行测量。我们可以在任意现代计算机中产生数以百万计的异步任务。如果尝试用线程来这样做,我们实际上会耗尽内存!

然而,它们的 API 如此相似是有理由的。线程作为同步操作集的边界;线程之间的并发是可能的。任务作为异步操作集的边界,任务之间之内的并发是可能的,因为任务可以在其内部切换 future。最后,future 是 Rust 中最细粒度的并发单位,同时每一个 future 可能代表一个其它 future 的数。其运行时 -- 更准确地说,其执行器(executor)-- 管理任务,任务则管理 future。在这一点上,任务类似于轻量的、运行时管理的线程,并带有额外的由运行时管理而不是操作系统管理的能力。

这并不意味着异步任务总是优于线程(或者相反)。基于线程的并发在某种程度上来说是一个比基于 async 的并发更简单的编程模型。这可以是一个优点也可以是一个缺点。线程有点像 “射后不理”(“fire and forget”);