trpl-zh-cn/src/ch21-02-multithreaded.md

397 lines
29 KiB
Markdown
Raw Normal View History

2018-03-09 17:19:29 +08:00
## 将单线程 server 变为多线程 server
2024-10-20 23:44:05 +08:00
> [ch21-02-multithreaded.md](https://github.com/rust-lang/book/blob/main/src/ch21-02-multithreaded.md)
2018-03-09 17:19:29 +08:00
> <br>
2023-01-30 17:39:46 +08:00
> commit 98c6225e5fb8255349ec0dc235433530ed3fb534
2018-03-09 17:19:29 +08:00
目前 server 会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。
### 在当前 server 实现中模拟慢请求
2018-12-09 23:22:10 +08:00
让我们看看一个慢请求如何影响当前 server 实现中的其他请求。示例 20-10 通过模拟慢响应实现了 */sleep* 请求处理,它会使 server 在响应之前休眠五秒。
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/main.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,no_run
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-10/src/main.rs:here}}
2018-03-09 17:19:29 +08:00
```
2023-01-30 17:39:46 +08:00
<span class="caption">示例 20-10: 通过休眠五秒来模拟慢请求</span>
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
`if` 切换到 `match` 后现在有三个分支了。我们需要显式匹配一个 slice 的 `request_line` 以匹配字符串字面值的模式。`match` 不会像相等方法那样自动引用和解引用。
第一个分支与示例 20-9 中的 `if` 代码块相同。第二个分支匹配一个 */sleep* 请求。当接收到这个请求时server 在渲染成功 HTML 页面之前会先休眠五秒。第三个分支与示例 20-9 中的 `else` 代码块相同。
2018-03-09 17:19:29 +08:00
2019-06-19 14:39:12 +08:00
现在就可以真切的看出我们的 server 有多么的原始:真实的库将会以更简洁的方式处理多请求识别问题!
2018-03-09 17:19:29 +08:00
使用 `cargo run` 启动 server并接着打开两个浏览器窗口一个请求 *http://127.0.0.1:7878/* 而另一个请求 *http://127.0.0.1:7878/sleep* 。如果像之前一样多次请求 */*,会发现响应的比较快速。不过如果请求 */sleep* 之后再请求 */*,就会看到 */* 会等待直到 `sleep` 休眠完五秒之后才出现。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
有多种技术可以用来避免所有请求都排在慢请求之后;我们将要实现的一个便是线程池。
2018-03-09 17:19:29 +08:00
### 使用线程池改善吞吐量
**线程池***thread pool*)是一组预先分配的等待或准备处理任务的线程。当程序收到一个新任务,线程池中的一个线程会被分配任务,这个线程会离开并处理任务。其余的线程则可用于处理在第一个线程处理任务的同时处理其他接收到的任务。当第一个线程处理完任务时,它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接,增加 server 的吞吐量。
2023-01-17 11:38:33 +08:00
我们会将池中线程限制为较少的数量以防拒绝服务Denial of ServiceDoS攻击如果程序为每一个接收的请求都新建一个线程某人向 server 发起千万级的请求时会耗尽服务器的资源并导致所有请求的处理都被终止。
2018-03-09 17:19:29 +08:00
不同于分配无限的线程,线程池中将有固定数量的等待线程。当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向队列索取另一个请求。通过这种设计,则可以并发处理 `N` 个请求,其中 `N` 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前增加了能处理的慢请求的数量。
2018-03-09 17:19:29 +08:00
这个设计仅仅是多种改善 web server 吞吐量的方法之一。其他可供探索的方法有 **fork/join 模型***fork/join model*)、**单线程异步 I/O 模型***single-threaded async I/O model*)或者**多线程异步 I/O 模型***multi-threaded async I/O model*)。如果你对这个主题感兴趣,则可以阅读更多关于其他解决方案的内容并尝试实现它们;对于一个像 Rust 这样的底层语言,所有这些方法都是可能的。
2018-03-09 17:19:29 +08:00
在开始之前,让我们讨论一下线程池应用看起来怎样。当尝试设计代码时,首先编写客户端接口确实有助于指导代码设计。以期望的调用方式来构建 API 代码的结构,接着在这个结构之内实现功能,而不是先实现功能再设计公有 API。
2023-01-30 17:39:46 +08:00
类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发compiler-driven development。我们将编写调用所期望的函数的代码接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。不过在开始之前我们将探索不会作为起点的技术。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
#### 为每一个请求分配线程
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
首先,让我们探索一下为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个可用的多线程 server 的起点。接着我们会增加线程池作为改进,这样比较两个方案将会更容易。示例 20-11 展示了 `main` 的改变,它在 `for` 循环中为每一个流分配了一个新线程进行处理:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/main.rs</span>
2018-03-09 17:19:29 +08:00
```rust,no_run
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-11/src/main.rs:here}}
2018-03-09 17:19:29 +08:00
```
<span class="caption">示例 20-11: 为每一个流新建一个线程</span>
2018-12-09 23:22:10 +08:00
正如第十六章讲到的,`thread::spawn` 会创建一个新线程并在其中运行闭包中的代码。如果运行这段代码并在在浏览器中加载 */sleep*,接着在另两个浏览器标签页中加载 */*,确实会发现 */* 请求不必等待 */sleep* 结束。不过正如之前提到的,这最终会使系统崩溃因为我们无限制的创建新线程。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
#### 创建有限数量的线程
2018-03-09 17:19:29 +08:00
我们期望线程池以类似且熟悉的方式工作,以便从线程切换到线程池并不会对使用该 API 的代码做出较大的修改。示例 20-12 展示我们希望用来替换 `thread::spawn``ThreadPool` 结构体的假想接口:
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/main.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,ignore,does_not_compile
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-12/src/main.rs:here}}
2018-03-09 17:19:29 +08:00
```
<span class="caption">示例 20-12: 假想的 `ThreadPool` 接口</span>
这里使用 `ThreadPool::new` 来创建一个新的线程池,它有一个可配置的线程数的参数,在这里是四。这样在 `for` 循环中,`pool.execute` 有着类似 `thread::spawn` 的接口,它获取一个线程池运行于每一个流的闭包。`pool.execute` 需要实现为获取闭包并传递给池中的线程运行。这段代码还不能编译,不过通过尝试,编译器会指导我们如何修复它。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
#### 采用编译器驱动构建 `ThreadPool`
2018-03-09 17:19:29 +08:00
继续并对示例 20-12 中的 *src/main.rs* 做出修改,并利用来自 `cargo check` 的编译器错误来驱动开发。下面是我们得到的第一个错误:
2022-02-10 23:01:01 +08:00
```console
2024-10-20 23:44:05 +08:00
{{#include ../listings/ch21-web-server/listing-21-12/output.txt}}
2018-03-09 17:19:29 +08:00
```
好的,这告诉我们需要一个 `ThreadPool` 类型或模块,所以我们将构建一个。`ThreadPool` 的实现会与 web server 的特定工作相独立,所以让我们从 `hello` crate 切换到存放 `ThreadPool` 实现的新库 crate。这也意味着可以在任何工作中使用这个单独的线程池库而不仅仅是处理网络请求。
创建 *src/lib.rs* 文件,它包含了目前可用的最简单的 `ThreadPool` 定义:
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/no-listing-01-define-threadpool-struct/src/lib.rs}}
2018-03-09 17:19:29 +08:00
```
2023-01-30 17:39:46 +08:00
接着编辑 *main.rs* 文件通过在 *src/main.rs* 的开头增加如下代码将 `ThreadPool` 从库 crate 引入作用域:
2018-03-09 17:19:29 +08:00
2023-02-11 23:46:35 +08:00
<span class="filename">文件名src/main.rs</span>
2018-03-09 17:19:29 +08:00
```rust,ignore
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/no-listing-01-define-threadpool-struct/src/main.rs:here}}
2018-03-09 17:19:29 +08:00
```
这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:
2022-02-10 23:01:01 +08:00
```console
2024-10-20 23:44:05 +08:00
{{#include ../listings/ch21-web-server/no-listing-01-define-threadpool-struct/output.txt}}
2018-03-09 17:19:29 +08:00
```
2018-12-09 23:22:10 +08:00
这告诉我们下一步是为 `ThreadPool` 创建一个叫做 `new` 的关联函数。我们还知道 `new` 需要有一个参数可以接受 `4`,而且 `new` 应该返回 `ThreadPool` 实例。让我们实现拥有此特征的最小化 `new` 函数:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件夹src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/no-listing-02-impl-threadpool-new/src/lib.rs}}
2018-03-09 17:19:29 +08:00
```
2022-02-10 23:01:01 +08:00
这里选择 `usize` 作为 `size` 参数的类型,因为我们知道为负的线程数没有意义。我们还知道将使用 4 作为线程集合的元素数量,这也就是使用 `usize` 类型的原因,如第三章 [“整型”][integer-types] 部分所讲。
2018-03-09 17:19:29 +08:00
再次编译检查这段代码:
2022-02-10 23:01:01 +08:00
```console
2024-10-20 23:44:05 +08:00
{{#include ../listings/ch21-web-server/no-listing-02-impl-threadpool-new/output.txt}}
2018-03-09 17:19:29 +08:00
```
2024-01-10 18:19:44 +08:00
现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 `ThreadPool` 上的 `execute` 方法。回忆 [“创建有限数量的线程”](#创建有限数量的线程) 部分我们决定线程池应该有与 `thread::spawn` 类似的接口,同时我们将实现 `execute` 函数来获取传递的闭包并将其传递给池中的空闲线程执行。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
我们会在 `ThreadPool` 上定义 `execute` 函数来获取一个闭包参数。回忆第十三章的 [“将被捕获的值移出闭包和 `Fn` trait”][fn-traits] 部分,闭包作为参数时可以使用三个不同的 trait`Fn`、`FnMut` 和 `FnOnce`。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 `thread::spawn`,所以我们可以观察 `thread::spawn` 的签名在其参数中使用了何种 bound。查看文档会发现
2018-03-09 17:19:29 +08:00
```rust,ignore
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
2022-02-10 23:01:01 +08:00
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
2018-03-09 17:19:29 +08:00
```
`F` 是这里我们关心的参数;`T` 与返回值有关所以我们并不关心。考虑到 `spawn` 使用 `FnOnce` 作为 `F` 的 trait bound这可能也是我们需要的因为最终会将传递给 `execute` 的参数传给 `spawn`。因为处理请求的线程只会执行闭包一次,这也进一步确认了 `FnOnce` 是我们需要的 trait这里符合 `FnOnce``Once` 的意思。
`F` 还有 trait bound `Send` 和生命周期绑定 `'static`,这对我们的情况也是有意义的:需要 `Send` 来将闭包从一个线程转移到另一个线程,而 `'static` 是因为并不知道线程会执行多久。让我们编写一个使用带有这些 bound 的泛型参数 `F``ThreadPool``execute` 方法:
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/no-listing-03-define-execute/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
`FnOnce` trait 仍然需要之后的 `()`,因为这里的 `FnOnce` 代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。
2018-12-09 23:22:10 +08:00
这里再一次增加了 `execute` 方法的最小化实现:它没有做任何工作,只是尝试让代码能够编译。再次进行检查:
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```console
2024-10-20 23:44:05 +08:00
{{#include ../listings/ch21-web-server/no-listing-03-define-execute/output.txt}}
2018-03-09 17:19:29 +08:00
```
现在就只有警告了!这意味着能够编译了!注意如果尝试 `cargo run` 运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 `execute` 的闭包!
> 一个你可能听说过的关于像 Haskell 和 Rust 这样有严格编译器的语言的说法是 “如果代码能够编译,它就能工作”。这是一个提醒大家的好时机,实际上这并不是普适的。我们的项目可以编译,不过它完全没有做任何工作!如果构建一个真实且功能完整的项目,则需花费大量的时间来开始编写单元测试来检查代码能否编译 **并且** 拥有期望的行为。
#### 在 `new` 中验证池中线程数量
这里仍然存在警告是因为其并没有对 `new``execute` 的参数做任何操作。让我们用期望的行为来实现这些函数。以考虑 `new` 作为开始。之前选择使用无符号类型作为 `size` 参数的类型,因为线程数为负的线程池没有意义。然而,线程数为零的线程池同样没有意义,不过零是一个完全有效的 `usize` 值。让我们增加在返回 `ThreadPool` 实例之前检查 `size` 是否大于零的代码,并使用 `assert!` 宏在得到零时 panic如示例 20-13 所示:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-13/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
<span class="caption">示例 20-13: 实现 `ThreadPool::new``size` 为零时 panic</span>
2023-01-30 17:39:46 +08:00
这里也用文档注释为 `ThreadPool` 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 `cargo doc --open` 并点击 `ThreadPool` 结构体来查看生成的 `new` 的文档看起来如何!
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
相比像这里使用 `assert!` 宏,也可以让 `new` 像之前 I/O 项目中示例 12-9 中 `Config::build` 那样将 `new` 更改为 `build` 并返回一个 `Result`,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的名为 `build` 的函数来对比一下 `new` 函数:
2018-03-09 17:19:29 +08:00
```rust,ignore
2023-01-30 17:39:46 +08:00
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
2018-03-09 17:19:29 +08:00
```
#### 分配空间以储存线程
现在有了一个有效的线程池线程数,就可以实际创建这些线程并在返回结构体之前将它们储存在 `ThreadPool` 结构体中。不过如何 “储存” 一个线程?让我们再看看 `thread::spawn` 的签名:
2018-03-09 17:19:29 +08:00
```rust,ignore
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
2022-02-10 23:01:01 +08:00
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
2018-03-09 17:19:29 +08:00
```
`spawn` 返回 `JoinHandle<T>`,其中 `T` 是闭包返回的类型。尝试使用 `JoinHandle` 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 `T` 将会是单元类型 `()`
2024-01-29 09:43:20 +08:00
示例 20-14 中的代码可以编译,不过实际上还并没有创建任何线程。我们改变了 `ThreadPool` 的定义来存放一个 `thread::JoinHandle<()>` 的 vector 实例,使用 `size` 容量来初始化,并设置一个 `for` 循环来运行创建线程的代码,并返回包含这些线程的 `ThreadPool` 实例:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2018-12-09 23:22:10 +08:00
```rust,ignore,not_desired_behavior
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-14/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
<span class="caption">示例 20-14: 为 `ThreadPool` 创建一个 vector 来存放线程</span>
这里将 `std::thread` 引入库 crate 的作用域,因为使用了 `thread::JoinHandle` 作为 `ThreadPool` 中 vector 元素的类型。
2023-01-30 17:39:46 +08:00
在得到了有效的数量之后,`ThreadPool` 新建一个存放 `size` 个元素的 vector。`with_capacity` 函数与 `Vec::new` 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 `size` 个元素,预先进行分配比仅仅 `Vec::new` 要稍微有效率一些,因为 `Vec::new` 随着插入元素而重新改变大小。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
如果再次运行 `cargo check`,它应该会成功。
2018-03-09 17:19:29 +08:00
#### `Worker` 结构体负责从 `ThreadPool` 中将代码传递给线程
示例 20-14 的 `for` 循环中留下了一个关于创建线程的注释。如何实际创建线程呢?这是一个难题。标准库提供的创建线程的方法,`thread::spawn`,它期望获取一些一旦创建线程就应该执行的代码。然而,我们希望开始线程并使其等待稍后传递的代码。标准库的线程实现并没有包含这么做的方法;我们必须自己实现。
2023-01-30 17:39:46 +08:00
我们将要实现的行为是创建线程并稍后发送代码,这会在 `ThreadPool` 和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 *Worker*,这是一个池实现中的常见概念。想象一下在餐馆厨房工作的员工:员工等待来自客户的订单,他们负责接受这些订单并完成它们。
2018-03-09 17:19:29 +08:00
2018-12-09 23:22:10 +08:00
不同于在线程池中储存一个 `JoinHandle<()>` 实例的 vector我们会储存 `Worker` 结构体的实例。每一个 `Worker` 会储存一个单独的 `JoinHandle<()>` 实例。接着会在 `Worker` 上实现一个方法,它会获取需要允许代码的闭包并将其发送给已经运行的线程执行。我们还会赋予每一个 worker `id`,这样就可以在日志和调试中区别线程池中的不同 worker。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
如下是创建 `ThreadPool` 时会发生的新过程。在通过如下方式设置完 `Worker` 之后,我们会实现向线程发送闭包的代码:
2018-03-09 17:19:29 +08:00
1. 定义 `Worker` 结构体存放 `id``JoinHandle<()>`
2. 修改 `ThreadPool` 存放一个 `Worker` 实例的 vector
3. 定义 `Worker::new` 函数,它获取一个 `id` 数字并返回一个带有 `id` 和用空闭包分配的线程的 `Worker` 实例
4.`ThreadPool::new` 中,使用 `for` 循环计数生成 `id`,使用这个 `id` 新建 `Worker`,并储存进 vector 中
如果你渴望挑战,在查示例 20-15 中的代码之前尝试自己实现这些修改。
准备好了吗?示例 20-15 就是一个做出了这些修改的例子:
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-15/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
<span class="caption">示例 20-15: 修改 `ThreadPool` 存放 `Worker` 实例而不是直接存放线程</span>
这里将 `ThreadPool` 中字段名从 `threads` 改为 `workers`,因为它现在储存 `Worker` 而不是 `JoinHandle<()>`。使用 `for` 循环中的计数作为 `Worker::new` 的参数,并将每一个新建的 `Worker` 储存在叫做 `workers` 的 vector 中。
2023-01-30 17:39:46 +08:00
`Worker` 结构体和其 `new` 函数是私有的,因为外部代码(比如 *src/main.rs* 中的 server并不需要知道关于 `ThreadPool` 中使用 `Worker` 结构体的实现细节。`Worker::new` 函数使用 `id` 参数并储存了使用一个空闭包创建的 `JoinHandle<()>`
> 注意:如果操作系统因为没有足够的系统资源而无法创建线程时,`thread::spawn` 会 panic。这会导致整个 server panic即使一些线程可能创建成功了。出于简单的考虑这个行为是可行的不过在一个生产级别的线程池实现中你可能会希望使用 [`std::thread::Builder`][builder] 和其 [`spawn`][builder-spawn] 方法来返回一个 `Result`。
2018-03-09 17:19:29 +08:00
这段代码能够编译并用指定给 `ThreadPool::new` 的参数创建储存了一系列的 `Worker` 实例,不过 **仍然** 没有处理 `execute` 中得到的闭包。让我们聊聊接下来怎么做。
2022-04-22 22:44:07 +08:00
#### 使用信道向线程发送请求
2018-03-09 17:19:29 +08:00
下一个需要解决的问题是传递给 `thread::spawn` 的闭包完全没有做任何工作。目前,我们在 `execute` 方法中获得期望执行的闭包,不过在创建 `ThreadPool` 的过程中创建每一个 `Worker` 时需要向 `thread::spawn` 传递一个闭包。
我们希望刚创建的 `Worker` 结构体能够从 `ThreadPool` 的队列中获取需要执行的代码,并发送到线程中执行它们。
2018-03-09 17:19:29 +08:00
2022-04-22 22:44:07 +08:00
在第十六章,我们学习了 **信道** —— 一个沟通两个线程的简单手段 —— 对于这个例子来说则是绝佳的。这里信道将充当任务队列的作用,`execute` 将通过 `ThreadPool` 向其中线程正在寻找工作的 `Worker` 实例发送任务。如下是这个计划:
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
1. `ThreadPool` 会创建一个信道并充当发送者。
2. 每个 `Worker` 将会充当接收者。
2022-04-22 22:44:07 +08:00
3. 新建一个 `Job` 结构体来存放用于向信道中发送的闭包。
2023-01-30 17:39:46 +08:00
4. `execute` 方法会在发送者发出期望执行的任务。
5. 在线程中,`Worker` 会遍历接收者并执行任何接收到的任务。
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
让我们以在 `ThreadPool::new` 中创建信道并让 `ThreadPool` 实例充当发送者开始,如示例 20-16 所示。`Job` 是将在信道中发出的类型,目前它是一个没有任何内容的结构体:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-16/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
2023-01-30 17:39:46 +08:00
<span class="caption">示例 20-16: 修改 `ThreadPool` 来储存一个传输 `Job` 实例的发送者</span>
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
`ThreadPool::new` 中,新建了一个信道,并接着让线程池在接收端等待。这段代码能够成功编译。
2018-03-09 17:19:29 +08:00
让我们尝试在线程池创建每个 worker 时将接收者传递给它们。须知我们希望在 worker 所分配的线程中使用接收者,所以将在闭包中引用 `receiver` 参数。示例 20-17 中展示的代码还不能编译:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2018-12-09 23:22:10 +08:00
```rust,ignore,does_not_compile
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-17/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
2022-04-22 22:44:07 +08:00
<span class="caption">示例 20-17: 将信道的接收端传递给 worker</span>
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
这是一些小而直观的修改:将接收者传递进了 `Worker::new`,并接着在闭包中使用它。
2018-03-09 17:19:29 +08:00
如果尝试 check 代码,会得到这个错误:
2022-02-10 23:01:01 +08:00
```console
2024-10-20 23:44:05 +08:00
{{#include ../listings/ch21-web-server/listing-21-17/output.txt}}
2018-03-09 17:19:29 +08:00
```
2023-01-30 17:39:46 +08:00
这段代码尝试将 `receiver` 传递给多个 `Worker` 实例。这是不行的回忆第十六章Rust 所提供的信道实现是多 **生产者**,单 **消费者** 的。这意味着不能简单的克隆信道的消费端来解决问题。我们也不希望将一个消息向多个消费者发送多次;我们希望有一个消息列表和多个 worker 这样每个消息就只会处理一次。
2018-03-09 17:19:29 +08:00
2022-04-22 22:44:07 +08:00
另外,从信道队列中取出任务涉及到修改 `receiver`,所以这些线程需要一个能安全的共享和修改 `receiver` 的方式,否则可能导致竞争状态(参考第十六章)。
2018-03-09 17:19:29 +08:00
回忆一下第十六章讨论的线程安全智能指针,为了在多个线程间共享所有权并允许线程修改其值,需要使用 `Arc<Mutex<T>>`。`Arc` 使得多个 worker 拥有接收端,而 `Mutex` 则确保一次只有一个 worker 能从接收端得到任务。示例 20-18 展示了所需的修改:
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-18/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
2023-01-30 17:39:46 +08:00
<span class="caption">示例 20-18: 使用 `Arc``Mutex` 在 worker 间共享接收者</span>
2018-03-09 17:19:29 +08:00
2023-01-30 17:39:46 +08:00
`ThreadPool::new` 中,将接收者放入一个 `Arc` 和一个 `Mutex` 中。对于每一个新 worker克隆 `Arc` 来增加引用计数,如此这些 worker 就可以共享接收者的所有权了。
2018-03-09 17:19:29 +08:00
通过这些修改,代码可以编译了!我们做到了!
#### 实现 `execute` 方法
2024-10-20 23:44:05 +08:00
最后让我们实现 `ThreadPool` 上的 `execute` 方法。同时也要修改 `Job` 结构体:它将不再是结构体,`Job` 将是一个有着 `execute` 接收到的闭包类型的 trait 对象的类型别名。第二十章 [“类型别名用来创建类型同义词”][creating-type-synonyms-with-type-aliases] 部分提到过,类型别名允许将长的类型变短。观察示例 20-19
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-19/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
2022-04-22 22:44:07 +08:00
<span class="caption">示例 20-19: 为存放每一个闭包的 `Box` 创建一个 `Job` 类型别名,接着在信道中发出任务</span>
2018-03-09 17:19:29 +08:00
在使用 `execute` 得到的闭包新建 `Job` 实例之后,将这些任务从信道的发送端发出。这里调用 `send` 上的 `unwrap`,因为发送可能会失败,这可能发生于例如停止了所有线程执行的情况,这意味着接收端停止接收新消息了。不过目前我们无法停止线程执行;只要线程池存在它们就会一直执行。使用 `unwrap` 是因为我们知道失败不可能发生,即便编译器不这么认为。
2018-03-09 17:19:29 +08:00
不过到此事情还没有结束!在 worker 中,传递给 `thread::spawn` 的闭包仍然还只是 **引用** 了信道的接收端。相反我们需要闭包一直循环,向信道的接收端请求任务,并在得到任务时执行它们。如示例 20-20 对 `Worker::new` 做出修改:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```rust,noplayground
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-20/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
<span class="caption">示例 20-20: 在 worker 线程中接收并执行任务</span>
这里,首先在 `receiver` 上调用了 `lock` 来获取互斥器,接着 `unwrap` 在出现任何错误时 panic。如果互斥器处于一种叫做 **被污染***poisoned*)的状态时获取锁可能会失败,这可能发生于其他线程在持有锁时 panic 了且没有释放锁。在这种情况下,调用 `unwrap` 使其 panic 是正确的行为。请随意将 `unwrap` 改为包含有意义错误信息的 `expect`
2022-04-22 22:44:07 +08:00
如果锁定了互斥器,接着调用 `recv` 从信道中接收 `Job`。最后的 `unwrap` 也绕过了一些错误,这可能发生于持有信道发送端的线程停止的情况,类似于如果接收端关闭时 `send` 方法如何返回 `Err` 一样。
2018-03-09 17:19:29 +08:00
2018-12-09 23:22:10 +08:00
调用 `recv` 会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务。`Mutex<T>` 确保一次只有一个 `Worker` 线程尝试请求任务。
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
现在线程池处于可以运行的状态了!执行 `cargo run` 并发起一些请求:
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
```console
2018-03-09 17:19:29 +08:00
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
2022-02-10 23:01:01 +08:00
warning: field is never read: `workers`
2018-03-09 17:19:29 +08:00
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
2022-02-10 23:01:01 +08:00
= note: `#[warn(dead_code)]` on by default
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
warning: field is never read: `id`
--> src/lib.rs:48:5
2018-03-09 17:19:29 +08:00
|
2022-02-10 23:01:01 +08:00
48 | id: usize,
2018-03-09 17:19:29 +08:00
| ^^^^^^^^^
2022-02-10 23:01:01 +08:00
warning: field is never read: `thread`
--> src/lib.rs:49:5
2018-03-09 17:19:29 +08:00
|
2022-02-10 23:01:01 +08:00
49 | thread: thread::JoinHandle<()>,
2018-03-09 17:19:29 +08:00
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-01-30 17:39:46 +08:00
warning: `hello` (lib) generated 3 warnings
2022-02-10 23:01:01 +08:00
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
2023-01-30 17:39:46 +08:00
Running `target/debug/hello`
2018-12-09 23:22:10 +08:00
Worker 0 got a job; executing.
2018-03-09 17:19:29 +08:00
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
```
2018-12-09 23:22:10 +08:00
成功了!现在我们有了一个可以异步执行连接的线程池!它绝不会创建超过四个线程,所以当 server 收到大量请求时系统也不会负担过重。如果请求 */sleep*server 也能够通过另外一个线程处理其他请求。
> 注意如果同时在多个浏览器窗口打开 */sleep*,它们可能会彼此间隔地加载 5 秒,因为一些浏览器出于缓存的原因会顺序执行相同请求的多个实例。这些限制并不是由于我们的 web server 造成的。
2018-03-09 17:19:29 +08:00
2024-10-20 23:44:05 +08:00
在学习了第十七章和第十八章的 `while let` 循环之后,你可能会好奇为何不能如此编写 worker 线程,如示例 20-21 所示:
2018-03-09 17:19:29 +08:00
2023-01-17 11:38:33 +08:00
<span class="filename">文件名src/lib.rs</span>
2018-03-09 17:19:29 +08:00
2018-12-09 23:22:10 +08:00
```rust,ignore,not_desired_behavior
2024-10-20 23:44:05 +08:00
{{#rustdoc_include ../listings/ch21-web-server/listing-21-21/src/lib.rs:here}}
2018-03-09 17:19:29 +08:00
```
2019-12-22 10:57:37 +08:00
<span class="caption">示例 20-21: 一个使用 `while let``Worker::new` 替代实现</span>
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。其原因有些微妙:`Mutex` 结构体没有公有 `unlock` 方法,因为锁的所有权依赖 `lock` 方法返回的 `LockResult<MutexGuard<T>>``MutexGuard<T>` 的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 `Mutex` 守护的资源,不过如果没有认真的思考 `MutexGuard<T>` 的生命周期的话,也可能会导致比预期更久的持有锁。
2018-03-09 17:19:29 +08:00
2022-02-10 23:01:01 +08:00
示例 20-20 中的代码使用的 `let job = receiver.lock().unwrap().recv().unwrap();` 之所以可以工作是因为对于 `let` 来说,当 `let` 语句结束时任何表达式中等号右侧使用的临时值都会立即被丢弃。然而 `while let``if let` 和 `match`)直到相关的代码块结束都不会丢弃临时值。在示例 20-21 中,`job()` 调用期间锁一直持续,这也意味着其他的 worker 无法接受任务。
2019-12-22 10:57:37 +08:00
[creating-type-synonyms-with-type-aliases]:
2024-10-20 23:44:05 +08:00
ch20-04-advanced-types.html#类型别名用来创建类型同义词
2022-02-10 23:01:01 +08:00
[integer-types]: ch03-02-data-types.html#整型
2023-01-30 17:39:46 +08:00
[fn-traits]: ch13-01-closures.html#将被捕获的值移出闭包和-fn-trait
[builder]: https://doc.rust-lang.org/std/thread/struct.Builder.html
[builder-spawn]: https://doc.rust-lang.org/std/thread/struct.Builder.html#method.spawn