From 29c0ae40f9cd99b99c5590b3a3e3a08c607a56a6 Mon Sep 17 00:00:00 2001 From: kazeno Date: Tue, 11 Mar 2025 18:49:37 +0800 Subject: [PATCH] update ch17-04 --- README.md | 2 +- src/ch17-04-streams.md | 127 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 113 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index d6546b9..ee0c261 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ PS: ## 校对 -部分翻译采用 ChatGPT 4o 进行翻译校对。提示词详见 [proofreading_prompt.md](proofreading_prompt.md) +部分章节采用 ChatGPT 4.5 进行翻译校对。提示词详见 [proofreading_prompt.md](proofreading_prompt.md) ## 静态页面构建与文档撰写 diff --git a/src/ch17-04-streams.md b/src/ch17-04-streams.md index 39563cd..6872514 100644 --- a/src/ch17-04-streams.md +++ b/src/ch17-04-streams.md @@ -1,14 +1,14 @@ -## 流(Streams) +## 流(Streams):顺序的 Futrues > [ch17-04-streams.md](https://github.com/rust-lang/book/blob/main/src/ch17-04-streams.md) >
-> commit f04d20fe8d1a49c3bffa10a3086c58e527ff0a90 +> commit c7edf19e58701f894b4d906a6f7bd738ad4de801 -到本章的目前为止,我们大部分时间停留在独立的 future 上。一个重要的例外就是我们用过的异步信道。回忆一下在本章之前的 [“消息传递”][17-02-messages] 中我们如何使用异步信道接收端的。异步 `recv` 方法随着时间的推移产生一个序列的项。这是一个通用的多的模式的实例,通常被称为 *流*(*stream*)。 +到本章的目前为止,我们大部分时间都专注于单个的 future 上。一个重要的例外就是我们用过的异步信道。回忆一下在本章之前的 [“消息传递”][17-02-messages] 中我们如何使用异步信道接收端的。异步 `recv` 方法随着时间的推移产生一个序列的项。这是一个更通用的模式的实例,通常被称为 *流*(*stream*)。 -一个序列的项是我们之前是见过的,回忆一下第十三章的 `Iterator` trait,不过迭代器和异步信道接收端有两个区别。第一个区别是时间的维度:迭代器是同步的,而信道接收端是异步的。第二个区别是 API。当直接处理 `Iterator` 时,我们会调用其同步 `next` 方法。对于这个特定的 `trpl::Receiver` 流,我们调用一个异步的 `recv` 方法。不过这两个 API 看起来非常相似。 +我们之前在第十三章的 [Iterator trait 和 `next` 方法][iterator-trait] 部分已经见过项的序列,不过迭代器和异步信道接收端有两个区别。第一个区别是时间维度:迭代器是同步的,而信道接收端是异步的。第二个区别是 API。当直接处理 `Iterator` 时,我们会调用其同步 `next` 方法。对于这个特定的 `trpl::Receiver` 流,我们调用一个异步的 `recv` 方法。除此之外,这两种 API 在使用上感觉十分相似,这种相似性并非巧合。流类似于一种异步形式的迭代器。不过鉴于 `trpl::Receiver` 专门等待接收消息,多用途的流 API 则更为通用:它像 `Iterator` 一样提供了下一个项,但采用异步的方式。 -这种相似性并非巧合。流类似于一种异步形式的迭代器。不过鉴于 `trpl::Receiver` 专门等待接收消息,多用途的流 API 则更为通用:它像 `Iterator` 一样提供了下一个项,不过是异步版本的。Rust 中迭代器和流的相似性意味着我们实际上可以从任何迭代器上创建流。就迭代器而言,可以通过调用其 `next` 方法并 await 输出来使用流,如示例 17-30 所示。 +Rust 中迭代器和流的相似性意味着我们实际上可以从任何迭代器上创建流。就迭代器而言,可以通过调用其 `next` 方法并 await 输出来使用流,如示例 17-30 所示。
@@ -22,9 +22,9 @@
-我们以一组数字作为开始,将其转换为一个迭代器并接着调用 `map` 将其所有值翻倍。然后使用 `trpl::stream_from_iter` 函数将迭代器转换为流。再然后在 `while let` 循环中到达时循环处理流中的项。 +我们以一组数字作为开始,将其转换为一个迭代器并接着调用 `map` 将其所有值翻倍。然后使用 `trpl::stream_from_iter` 函数将迭代器转换为流。随后,我们使用 `while let` 循环在项到达时对流中的每个项进行循环处理。 -不幸的是当我们尝试运行代码时,代码无法编译。相反如果我们观察其输出,它会报告没有可用的 `next` 方法。 +遗憾的是当我们尝试运行代码时,代码无法编译,而是报告没有可用的 `next` 方法。 + +```text +Message: 'a' +Message: 'b' +Message: 'c' +Message: 'd' +Message: 'e' +Message: 'f' +Message: 'g' +Message: 'h' +Message: 'i' +Message: 'j' +``` + +虽然再一次,我们可以使用常规的 `Receiver` API 甚至是 `Iterator` API 来做到这些,所以让我们增加一个需要流的功能:增加一个适用于流中所有项的超时,和一个发送项的延时,如示例 17-34 所示。 + +
+ +文件名:src/main.rs + +```rust +{{#rustdoc_include ../listings/ch17-async-await/listing-17-34/src/main.rs:timeout}} +``` + +
示例 17-34:使用 `StreamExt::timeout` 方法为流中的项设置时限
+ +
+ +我们通过 `timeout` 方法在流上增加超时来作为开始,它来自 `StreamExt` trait。接着我们更新 `while let` 循环体,因为现在流返回一个 `Result`。`Ok` 变体表明消息及时到达;`Err` 变体表明任何消息到达前就触发超时了。我们 `match` 其结果要么在成功接收时打印消息要么打印一个超时的提示。最后,注意我们在加上超时之后 pin 了这些消息,因为超时助手函数产生了一个需要 pin 住才能拉取的流。 + +然后,因为消息之间没有延时,超时并不会改变程序的行为。让我们为发送的消息增加一个延时变量,如示例 17-35 所示。 + +
+ +文件名:src/main.rs + +```rust +{{#rustdoc_include ../listings/ch17-async-await/listing-17-35/src/main.rs:messages}} +``` + +
示例 17-35:通过 `tx` 和一个异步延时而不是将 `get_messages` 变成异步函数来发送消息
+ +
+ +在 `get_messages` 中,我们在 `messages` 数组上使用 `enumerate` 迭代器方法以便能够一起获得项本身和其索引。然后我们在偶数索引的项引入 100 毫秒的延时并为奇数索引的项引入 300 毫秒的延时来模拟真实世界的消息流中可能见到的不同的延时。因为我们的延时为 200 毫秒左右,这应该会影响一半的消息。 + +为了在 `get_messages` 函数中的消息之前休眠而不阻塞,我们需要使用异步。然而,我们不能将 `get_messages` 函数本身变为异步函数,因为这样它会返回一个 `Future>` 而不是 `Stream>`。调用者则不得不等待 `get_messages` 本身来获取流。不过请记住:在一个给定的 future 中的一切都是顺序发生的;并发发生在 futures **之间**。等待 `get_messages` 会要求其发送所有的消息,包括消息之间的休眠延时,在返回接收端流之前。其结果是,超时将毫无用处。流本身没有任何的延时;它们甚至全都发生在流可用之前。 + +相反,我们保持 `get_messages` 为一个返回流的常规函数,并产生一个任务来处理异步 `sleep` 调用。 + +> 注意:像这样调用 `spawn_task` 可以工作是因为我们已经设置了运行时;如果没有,则会造成 panic。其它的实现则选择了不同的权衡取舍:它们可能会产生一个新的运行时来避免 panic 不过最终会有一些额外开销,或者它们可能简单地在没有运行时的引用的情况下不提供一个独立的方式来产生任务。请务必理解你的运行时所选择的取舍来编写相应的代码! + +现在我们的代码有了一个更为有趣的结果。每隔一对消息会有一个 `Problem: Elapsed(())` 错误。 + + + +```text +Message: 'a' +Problem: Elapsed(()) +Message: 'b' +Message: 'c' +Problem: Elapsed(()) +Message: 'd' +Message: 'e' +Problem: Elapsed(()) +Message: 'f' +Message: 'g' +Problem: Elapsed(()) +Message: 'h' +Message: 'i' +Problem: Elapsed(()) +Message: 'j' +``` + +超时最终并不会阻止消息到达。我们仍然能够得到所有原始的消息,因为我们的信道是 **无限的**(**unbounded**):它可以存储内存所允许的所有消息。如果消息在超时之前没有到达,流处理器会做出反应,不过当再次拉取流时,消息现在可能已经到达了。 + +如果需要的话通过使用不同的信道或者其他更通用的流来得到不同行为。让我们看一个实际的通过结合一个时间间隔的流和这个消息流的例子。 + +### 合并流 + +首先,让我们创建另一个流,如果直接运行它的话它会每毫秒发送一个项。 + [17-02-messages]: ch17-02-concurrency-with-async.html#消息传递 +[iterator-trait]: ch13-02-iterators.html#the-iterator-trait-and-the-next-method