2018-03-09 17:19:29 +08:00
## 将单线程 server 变为多线程 server
> [ch20-02-multithreaded.md](https://github.com/rust-lang/book/blob/master/second-edition/src/ch20-02-multithreaded.md)
> <br>
> commit 1f0136399ba2f5540ecc301fab04bd36492e5554
<!-- Reading ahead, the original heading didn't seem to fit all of the sub
headings -- this might not be totally right either, so feel free to replace
with something more appropriate -->
<!-- This is fine! /Carol -->
目前 server 会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。
### 在当前 server 实现中模拟慢请求
让我们看看一个慢请求如何影响当前 server 实现中的其他请求。示例 20-10 通过模拟慢响应实现了 `/sleep` 请求处理,它会使 server 在响应之前休眠五秒。
< span class = "filename" > 文件名: src/main.rs< / span >
```rust
use std::thread;
use std::time::Duration;
# use std::io::prelude::*;
# use std::net::TcpStream;
# use std::fs::File;
// --snip--
fn handle_connection(mut stream: TcpStream) {
# let mut buffer = [0; 512];
# stream.read(&mut buffer).unwrap();
// --snip--
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
// --snip--
}
```
< span class = "caption" > 示例 20-10: 通过识别 `/sleep` 并休眠五秒来模拟慢请求</ span >
这段代码有些凌乱,不过对于模拟的目的来说已经足够!这里创建了第二个请求 `sleep` ,我们会识别其数据。在 `if` 块之后增加了一个 `else if` 来检查 `/sleep` 请求,当接收到这个请求时,在渲染成功 HTML 页面之前会先休眠五秒。
现在就可以真切的看出我们的 server 有多么的原始;真实的库将会以更简洁的方式处理多请求识别问题。
使用 `cargo run` 启动 server, 并接着打开两个浏览器窗口: 一个请求 `http://localhost:7878/` 而另一个请求 `http://localhost:7878/sleep` 。如果像之前一样多次请求 `/` ,会发现响应的比较快速。不过如果请求`/sleep` 之后在请求 `/` ,就会看到 `/` 会等待直到 `sleep` 休眠完五秒之后才出现。
这里有多种办法来改变我们的 web server 使其避免所有请求都排在慢请求之后;我们将要实现的一个便是线程池。
### 使用线程池改善吞吐量
<!-- There seems to be some repetition throughout these thread pool sections, is
there any way to condense it? I've edited with this in mind, but am wary of
changing too much -->
<!-- Your edits that removed repetition are fine! /Carol -->
**线程池**( *thread pool*)是一组预先分配的等待或准备处理任务的线程。当程序收到一个新任务,线程池中的一个线程会被分配任务,这个线程会离开并处理任务。其余的线程则可用于处理在第一个线程处理任务的同时处理其他接收到的任务。当第一个线程处理完任务时,它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接,增加 server 的吞吐量。
2018-07-30 18:55:44 +08:00
我们会将池中线程限制为较少的数量, 以防拒绝服务( Denial of Service, DoS) 攻击; 如果程序为每一个接收的请求都新建一个线程, 某人向 server 发起千万级的请求时会耗尽服务器的资源并导致所有请求的处理都被终止。
2018-03-09 17:19:29 +08:00
不同于分配无限的线程,线程池中将有固定数量的等待线程。当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向对队列索取另一个请求。通过这种设计,则可以并发处理 `N` 个请求,其中 `N` 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前增加了能处理的慢请求的数量。
这个设计仅仅是多种改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和单线程异步 I/O 模型。如果你对这个主题感兴趣,则可以阅读更多关于其他解决方案的内容并尝试用 Rust 实现他们;对于一个像 Rust 这样的底层语言,所有这些方法都是可能的。
在开始之前,让我们讨论一下线程池应用看起来怎样。当尝试设计代码时,首先编写客户端接口确实有助于指导代码设计。以期望的调用方式来构建 API 代码的结构,接着在这个结构之内实现功能,而不是先实现功能再设计公有 API。
类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发( Compiler Driven Development) 。我们将编写调用所期望的函数的代码, 接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。
#### 为每一个请求分配线程的代码结构
首先,让我们探索一下为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个开始。示例 20-11 展示了 `main` 的改变,它在 `for` 循环中为每一个流分配了一个新线程进行处理:
< span class = "filename" > 文件名: src/main.rs< / span >
```rust,no_run
# use std::thread;
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
#
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
# fn handle_connection(mut stream: TcpStream) {}
```
< span class = "caption" > 示例 20-11: 为每一个流新建一个线程< / span >
正如第十六章讲到的,`thread::spawn` 会创建一个新线程并在其中运行闭包中的代码。如果运行这段代码并在在浏览器中加载 `/sleep` ,接着在另两个浏览器标签页中加载 `/` ,确实会发现 `/` 请求不必等待 `/sleep` 结束。不过正如之前提到的,这最终会使系统崩溃因为我们无限制的创建新线程。
#### 为有限数量的线程创建一个类似的接口
我们期望线程池以类似且熟悉的方式工作,以便从线程切换到线程池并不会对使用该 API 的代码做出较大的修改。示例 20-12 展示我们希望用来替换 `thread::spawn` 的 `ThreadPool` 结构体的假想接口:
< span class = "filename" > 文件名: src/main.rs< / span >
```rust,no_run
# use std::thread;
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
# struct ThreadPool;
# impl ThreadPool {
# fn new(size: u32) -> ThreadPool { ThreadPool }
# fn execute<F>(&self, f: F)
# where F: FnOnce() + Send + 'static {}
# }
#
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
# fn handle_connection(mut stream: TcpStream) {}
```
< span class = "caption" > 示例 20-12: 假想的 `ThreadPool` 接口</ span >
这里使用 `ThreadPool::new` 来创建一个新的线程池,它有一个可配置的线程数的参数,在这里是四。这样在 `for` 循环中,`pool.execute` 有着类似 `thread::spawn` 的接口,它获取一个线程池运行于每一个流的闭包。`pool.execute` 需要实现为获取闭包并传递给池中的线程运行。这段代码还不能编译,不过通过尝试编译器会指导我们如何修复它。
<!-- Can you be more specific here about how pool.execute will work? -->
<!-- So clarified. I hope this helps with some of the future confusion as well
/Carol -->
#### 采用编译器驱动构建 `ThreadPool` 结构体
继续并对示例 20-12 中的 *src/main.rs* 做出修改,并利用来自 `cargo check` 的编译器错误来驱动开发。下面是我们得到的第一个错误:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
--> src\main.rs:10:16
|
10 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^ Use of undeclared type or module
`ThreadPool`
error: aborting due to previous error
```
好的,这告诉我们需要一个 `ThreadPool` 类型或模块,所以我们将构建一个。`ThreadPool` 的实现会与 web server 的特定工作相独立,所以让我们从 `hello` crate 切换到存放 `ThreadPool` 实现的新库 crate。这也意味着可以在任何工作中使用这个单独的线程池库, 而不仅仅是处理网络请求。
创建 *src/lib.rs* 文件,它包含了目前可用的最简单的 `ThreadPool` 定义:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
pub struct ThreadPool;
```
接着创建一个新目录,*src/bin*,并将二进制 crate 根文件从 *src/main.rs* 移动到 *src/bin/main.rs* 。这使得库 crate 成为 *hello* 目录的主要 crate; 不过仍然可以使用 `cargo run` 运行 *src/bin/main.rs* 二进制文件。移动了 *main.rs* 文件之后,修改 *src/bin/main.rs* 文件开头加入如下代码来引入库 crate 并将 `ThreadPool` 引入作用域:
< span class = "filename" > 文件名: src/bin/main.rs< / span >
```rust,ignore
extern crate hello;
use hello::ThreadPool;
```
这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for type
`hello::ThreadPool` in the current scope
--> src/bin/main.rs:13:16
|
13 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^ function or associated item not found in
`hello::ThreadPool`
```
好的,这告诉我们下一步是为 `ThreadPool` 创建一个叫做 `new` 的关联函数。我们还知道 `new` 需要有一个参数可以接受 `4` ,而且 `new` 应该返回 `ThreadPool` 实例。让我们实现拥有此特征的最小化 `new` 函数:
< span class = "filename" > 文件夹: src/lib.rs< / span >
```rust
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
```
这里选择 `usize` 作为 `size` 参数的类型,因为我们知道为负的线程数没有意义。我们还知道将使用 4 作为线程集合的元素数量,这也就是使用 `usize` 类型的原因,如第三章 “整数类型” 部分所讲。
再次编译检查这段代码:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
--> src/lib.rs:4:16
|
4 | pub fn new(size: usize) -> ThreadPool {
| ^^^^
|
= note: #[warn(unused_variables)] on by default
= note: to avoid this warning, consider using `_size` instead
error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
--> src/bin/main.rs:18:14
|
18 | pool.execute(|| {
| ^^^^^^^
```
<!-- Can you say a few words on why we would need an execute method, what Rust
needs it for? Also why we need a closure/what indicated that we need a closure
here? -->
<!-- *Rust* doesn't need it, the thread pool functionality we're working on
implementing needs it. I've tried to clarify without getting too repetitive
with the "Creating a Similar Interface for a Finite Number of Threads" section
/Carol -->
现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 `ThreadPool` 上的 `execute` 方法。回忆 “为有限数量的线程创建一个类似的接口” 部分我们决定线程池应该有与 `thread::spawn` 类似的接口,同时我们将实现 `execute` 函数来获取传递的闭包并将其传递给池中的空闲线程执行。
我们会在 `ThreadPool` 上定义 `execute` 函数来获取一个闭包参数。回忆第十三章的 “使用带有泛型和 `Fn` trait 的闭包” 部分,闭包作为参数时可以使用三个不同的 trait: `Fn`、`FnMut` 和 `FnOnce` 。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 `thread::spawn` ,所以我们可以观察 `thread::spawn` 的签名在其参数中使用了何种 bound。查看文档会发现:
```rust,ignore
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
```
`F` 是这里我们关心的参数;`T` 与返回值有关所以我们并不关心。考虑到 `spawn` 使用 `FnOnce` 作为 `F` 的 trait bound, 这可能也是我们需要的, 因为最终会将传递给 `execute` 的参数传给 `spawn` 。因为处理请求的线程只会执行闭包一次,这也进一步确认了 `FnOnce` 是我们需要的 trait, 这里符合 `FnOnce` 中 `Once` 的意思。
<!-- Above - - why does that second reason mean FnOnce is the trait to use, can
you remind us? -->
<!-- Attempted, we're just pointing out that it's in the name Fn*Once* /Carol
-->
`F` 还有 trait bound `Send` 和生命周期绑定 `'static` ,这对我们的情况也是有意义的:需要 `Send` 来将闭包从一个线程转移到另一个线程,而 `'static` 是因为并不知道线程会执行多久。让我们编写一个使用带有这些 bound 的泛型参数 `F` 的 `ThreadPool` 的 `execute` 方法:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
# pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn execute< F > (& self, f: F)
where
F: FnOnce() + Send + 'static
{
}
}
```
`FnOnce` trait 仍然需要之后的 `()` ,因为这里的 `FnOnce` 代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。
这里再一次增加了 `execute` 方法的最小化实现,它没有做任何工作。再次进行检查:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
--> src/lib.rs:4:16
|
4 | pub fn new(size: usize) -> ThreadPool {
| ^^^^
|
= note: #[warn(unused_variables)] on by default
= note: to avoid this warning, consider using `_size` instead
warning: unused variable: `f`
--> src/lib.rs:8:30
|
8 | pub fn execute< F > (& self, f: F)
| ^
|
= note: to avoid this warning, consider using `_f` instead
```
现在就只有警告了!这意味着能够编译了!注意如果尝试 `cargo run` 运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 `execute` 的闭包!
> 一个你可能听说过的关于像 Haskell 和 Rust 这样有严格编译器的语言的说法是 “如果代码能够编译,它就能工作”。这是一个提醒大家的好时机,实际上这并不是普适的。我们的项目可以编译,不过它完全没有做任何工作!如果构建一个真实且功能完整的项目,则需花费大量的时间来开始编写单元测试来检查代码能否编译 **并且** 拥有期望的行为。
#### 在 `new` 中验证池中线程数量
这里仍然存在警告是因为其并没有对 `new` 和 `execute` 的参数做任何操作。让我们用期望的行为来实现这些函数。以考虑 `new` 作为开始。
之前选择使用无符号类型作为 `size` 参数的类型,因为线程数为负的线程池没有意义。然而,线程数为零的线程池同样没有意义,不过零是一个完全有效的 `u32` 值。让我们增加在返回 `ThreadPool` 实例之前检查 `size` 是否大于零的代码,并使用 `assert!` 宏在得到零时 panic, 如示例 20-13 所示:
在返回 `ThreadPool` 之前检查 `size` 是否大于零,并使用 `assert!` 宏在得到零时 panic, 如列表 20-13 所示:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
# pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
}
```
< span class = "caption" > 示例 20-13: 实现 `ThreadPool::new` 在 `size` 为零时 panic</ span >
趁着这个机会我们用文档注释为 `ThreadPool` 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 `cargo doc --open` 并点击 `ThreadPool` 结构体来查看生成的 `new` 的文档看起来如何!
相比像这里使用 `assert!` 宏,也可以让 `new` 像之前 I/O 项目中示例 12-9 中 `Config::new` 那样返回一个 `Result` ,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的 `new` 版本来感受一下两者的区别:
```rust,ignore
fn new(size: usize) -> Result< ThreadPool , PoolCreationError > {
```
#### 分配空间以储存线程
现在有了一个有效的线程池线程数,就可以实际创建这些线程并在返回之前将他们储存在 `ThreadPool` 结构体中。
这引出了另一个问题:如何 “储存” 一个线程?让我们再看看 `thread::spawn` 的签名:
```rust,ignore
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
```
`spawn` 返回 `JoinHandle<T>` ,其中 `T` 是闭包返回的类型。尝试使用 `JoinHandle` 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 `T` 将会是单元类型 `()` 。
示例 20-14 中的代码可以编译,不过实际上还并没有创建任何线程。我们改变了 `ThreadPool` 的定义来存放一个 `thread::JoinHandle<()>` 的 vector 实例,使用 `size` 容量来初始化,并设置一个 `for` 循环了来运行创建线程的代码,并返回包含这些线程的 `ThreadPool` 实例:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore
use std::thread;
pub struct ThreadPool {
threads: Vec< thread::JoinHandle < ( ) > >,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool {
threads
}
}
// --snip--
}
```
< span class = "caption" > 示例 20-14: 为 `ThreadPool` 创建一个 vector 来存放线程</ span >
这里将 `std::thread` 引入库 crate 的作用域,因为使用了 `thread::JoinHandle` 作为 `ThreadPool` 中 vector 元素的类型。
在得到了有效的数量之后,`ThreadPool` 新建一个存放 `size` 个元素的 vector。本书还未使用过 `with_capacity` ,它与 `Vec::new` 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 `size` 个元素,预先进行分配比仅仅 `Vec::new` 要稍微有效率一些,因为 `Vec::new` 随着插入元素而重新改变大小。
如果再次运行 `cargo check` ,会看到一些警告,不过应该可以编译成功。
#### `Worker` 结构体负责从 `ThreadPool` 中将代码传递给线程
<!-- I wasn't sure what this next paragraph was relevant to, can you connect it
up more clearly?-->
<!-- This is where we're actually getting into the meat of the implementation,
I've tried to make it clearer :( /Carol-->
示例 20-14 的 `for` 循环中留下了一个关于创建线程的注释。如何实际创建线程呢?这是一个难题。标准库提供的创建线程的方法,`thread::spawn`,它期望获取一些一旦创建线程就应该执行的代码。然而,我们希望开始线程并使其等待稍后传递的代码。标准库的线程实现并没有包含这么做的方法;我们必须自己实现。
<!-- Can you say how doing this refactoring will improve the code - - why don't
we want the pool to store threads directly? (I got that from the listing
caption because I wasn't sure what the end game was) -->
<!-- I hope the end game is now clearer in the previous paragraph: we *can't*
store the threads directly and get the behavior we want. /Carol -->
我们将要实现的行为是创建线程并稍后发送代码,这会在 `ThreadPool` 和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 `Worker` :这是一个池实现中的常见概念。想象一下在餐馆厨房工作的员工:员工等待来自客户的订单,他们负责接受这些订单并完成它们。
<!-- I was unclear on what a worker actually is here - - is this a
programming/Rust term, or just what we're calling the struct? Can you make it
clearer what the worker is and its responsibilities? -->
<!-- I've tried in the previous paragraph; it's a common term in job
queue/pooling implementations in programming in general but I think should make
sense in plain English with the real-life metaphor I've added /Carol -->
不同于在线程池中储存一个 `JoinHandle<()>` 实例的 vector, 我们会储存 `Worker` 结构体的实例。每一个 `Worker` 会储存一个单独的 `JoinHandle<()>` 实例。接着会在
`Worker` 上实现一个方法,它会获取需要允许代码的闭包并将其发送给已经运行的线程执行。我们还会赋予每一个 worker `id` ,这样就可以在日志和调试中区别线程池中的不同 worker。
首先,让我们做出如此创建 `ThreadPool` 时所需的修改。在通过如下方式设置完 `Worker` 之后,我们会实现向线程发送闭包的代码:
1. 定义 `Worker` 结构体存放 `id` 和 `JoinHandle<()>`
2. 修改 `ThreadPool` 存放一个 `Worker` 实例的 vector
3. 定义 `Worker::new` 函数,它获取一个 `id` 数字并返回一个带有 `id` 和用空闭包分配的线程的 `Worker` 实例
4. 在 `ThreadPool::new` 中,使用 `for` 循环计数生成 `id` ,使用这个 `id` 新建 `Worker` ,并储存进 vector 中
如果你渴望挑战,在查示例 20-15 中的代码之前尝试自己实现这些修改。
准备好了吗?示例 20-15 就是一个做出了这些修改的例子:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
use std::thread;
pub struct ThreadPool {
workers: Vec< Worker > ,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers
}
}
// --snip--
}
struct Worker {
id: usize,
thread: thread::JoinHandle< ()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {
id,
thread,
}
}
}
```
< span class = "caption" > 示例 20-15: 修改 `ThreadPool` 存放 `Worker` 实例而不是直接存放线程</ span >
这里将 `ThreadPool` 中字段名从 `threads` 改为 `workers` ,因为它现在储存 `Worker` 而不是 `JoinHandle<()>` 。使用 `for` 循环中的计数作为 `Worker::new` 的参数,并将每一个新建的 `Worker` 储存在叫做 `workers` 的 vector 中。
`Worker` 结构体和其 `new` 函数是私有的,因为外部代码(比如 *src/bin/main.rs* 中的 server) 并不需要知道关于 `ThreadPool` 中使用 `Worker` 结构体的实现细节。`Worker::new` 函数使用 `id` 参数并储存了使用一个空闭包创建的 `JoinHandle<()>` 。
这段代码能够编译并用指定给 `ThreadPool::new` 的参数创建储存了一系列的 `Worker` 实例,不过 **仍然** 没有处理 `execute` 中得到的闭包。让我们聊聊接下来怎么做。
#### 使用通道向线程发送请求
下一个需要解决的问题是传递给 `thread::spawn` 的闭包完全没有做任何工作。目前,我们在 `execute` 方法中获得期望执行的闭包,不过在创建 `ThreadPool` 的过程中创建每一个 `Worker` 时需要向 `thread::spawn` 传递一个闭包。
我们希望刚创建的 `Worker` 结构体能够从 `ThreadPool` 的队列中获取需要执行的代码,并发送到线程中执行他们。
在第十六章,我们学习了 **通道** —— 一个沟通两个线程的简单手段 —— 对于这个例子来说则是绝佳的。这里通道将充当任务队列的作用,`execute` 将通过 `ThreadPool` 向其中线程正在寻找工作的 `Worker` 实例发送任务。如下是这个计划:
1. `ThreadPool` 会创建一个通道并充当发送端。
2. 每个 `Worker` 将会充当通道的接收端。
3. 新建一个 `Job` 结构体来存放用于向通道中发送的闭包。
4. `execute` 方法会在通道发送端发出期望执行的任务。
5. 在线程中,`Worker` 会遍历通道的接收端并执行任何接收到的任务。
让我们以在 `ThreadPool::new` 中创建通道并让 `ThreadPool` 实例充当发送端开始,如示例 20-16 所示。`Job` 是将在通道中发出的类型;目前它是一个没有任何内容的结构体:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
# use std::thread;
// --snip--
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec< Worker > ,
sender: mpsc::Sender< Job > ,
}
struct Job;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
#
# struct Worker {
# id: usize,
# thread: thread::JoinHandle<()>,
# }
#
# impl Worker {
# fn new(id: usize) -> Worker {
# let thread = thread::spawn(|| {});
#
# Worker {
# id,
# thread,
# }
# }
# }
```
< span class = "caption" > 示例 20-16: 修改 `ThreadPool` 来储存一个发送 `Job` 实例的通道发送端</ span >
在 `ThreadPool::new` 中,新建了一个通道,并接着让线程池在接收端等待。这段代码能够编译,不过仍有警告。
让我们尝试在线程池创建每个 worker 时将通道的接收端传递给他们。须知我们希望在 worker 所分配的线程中使用通道的接收端,所以将在闭包中引用 `receiver` 参数。示例 20-17 中展示的代码还不能编译:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver< Job > ) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker {
id,
thread,
}
}
}
```
< span class = "caption" > 示例 20-17: 将通道的接收端传递给 worker< / span >
这是一些小而直观的修改:将通道的接收端传递进了 `Worker::new` ,并接着在闭包中使用它。
如果尝试 check 代码,会得到这个错误:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:27:42
|
27 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here in
previous iteration of loop
|
= note: move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>` , which does not implement the `Copy` trait
```
这段代码尝试将 `receiver` 传递给多个 `Worker` 实例。这是不行的, 回忆第十六章: Rust 所提供的通道实现是多 **生产者** ,单 **消费者** 的。这意味着不能简单的克隆通道的消费端来解决问题。即便可以,那也不是我们希望使用的技术;我们希望通过在所有的 worker 中共享单一 `receiver` ,在线程间分发任务。
<!-- Above - you may be able to tell I struggled to follow this explanation,
can you double check my edits and correct here? -->
<!-- Yep, the text we had here was nonsensical. The edits are fine! /Carol -->
另外,从通道队列中取出任务涉及到修改 `receiver` ,所以这些线程需要一个能安全的共享和修改 `receiver` 的方式,否则可能导致竞争状态(参考第十六章)。
回忆一下第十六章讨论的线程安全智能指针,为了在多个线程间共享所有权并允许线程修改其值,需要使用 `Arc<Mutex<T>>` 。`Arc` 使得多个 worker 拥有接收端,而 `Mutex` 则确保一次只有一个 worker 能从接收端得到任务。示例 20-18 展示了所需的修改:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
# use std::thread;
# use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
// --snip--
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
# struct Job;
#
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
# struct Worker {
# id: usize,
# thread: thread::JoinHandle<()>,
# }
#
impl Worker {
fn new(id: usize, receiver: Arc< Mutex < mpsc::Receiver < Job > >>) -> Worker {
// --snip--
# let thread = thread::spawn(|| {
# receiver;
# });
#
# Worker {
# id,
# thread,
# }
}
}
```
< span class = "caption" > 示例 20-18: 使用 `Arc` 和 `Mutex` 在 worker 间共享通道的接收端</ span >
在 `ThreadPool::new` 中,将通道的接收端放入一个 `Arc` 和一个 `Mutex` 中。对于每一个新 worker, 克隆 `Arc` 来增加引用计数,如此这些 worker 就可以共享接收端的所有权了。
通过这些修改,代码可以编译了!我们做到了!
#### 实现 `execute` 方法
最后让我们实现 `ThreadPool` 上的 `execute` 方法。同时也要修改 `Job` 结构体:它将不再是结构体,`Job` 将是一个有着 `execute` 接收到的闭包类型的 trait 对象的类型别名。第十九章 “类型别名用来创建类型同义词” 部分提到过,类型别名允许将长的类型变短。观察示例 20-19:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
// --snip--
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
# use std::sync::mpsc;
# struct Worker {}
type Job = Box< FnOnce ( ) + Send + ' static > ;
impl ThreadPool {
// --snip--
pub fn execute< F > (& self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
```
< span class = "caption" > 示例 20-19: 为存放每一个闭包的 `Box` 创建一个 `Job` 类型别名,接着在通道中发出任务</ span >
在使用 `execute` 得到的闭包新建 `Job` 实例之后,将这些任务从通道的发送端发出。这里调用 `send` 上的 `unwrap` ,因为发送可能会失败,这可能发生于例如停止了所有线程执行的情况,这意味着接收端停止接收新消息了。不过目前我们无法停止线程执行;只要线程池存在他们就会一直执行。使用 `unwrap` 是因为我们知道失败不可能发生,即便编译器不这么认为。
不过到此事情还没有结束!在 worker 中,传递给 `thread::spawn` 的闭包仍然还只是 **引用** 了通道的接收端。相反我们需要闭包一直循环,向通道的接收端请求任务,并在得到任务时执行他们。如示例 20-20 对 `Worker::new` 做出修改:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc< Mutex < mpsc::Receiver < Job > >>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
(*job)();
}
});
Worker {
id,
thread,
}
}
}
```
< span class = "caption" > 示例 20-20: 在 worker 线程中接收并执行任务< / span >
这里,首先在 `receiver` 上调用了 `lock` 来获取互斥器,接着 `unwrap` 在出现任何错误时 panic。如果互斥器处于一种叫做 **被污染** ( *poisoned*)的状态时获取锁可能会失败,这可能发生于其他线程在持有锁时 panic 了且没有释放锁。在这种情况下,调用 `unwrap` 使其 panic 是正确的行为。请随意将 `unwrap` 改为包含有意义错误信息的 `expect` 。
如果锁定了互斥器,接着调用 `recv` 从通道中接收 `Job` 。最后的 `unwrap` 也绕过了一些错误,这可能发生于持有通道发送端的线程停止的情况,类似于如果接收端关闭时 `send` 方法如何返回 `Err` 一样。
调用 `recv` 会 **阻塞** 当前线程,所以如果还没有任务,其会等待直到有可用的任务。`Mutex< T > ` 确保一次只有一个 `Worker` 线程尝试请求任务。
理论上这段代码应该能够编译。不幸的是, Rust 编译器仍不够完美,会给出如下错误:
```text
error[E0161]: cannot move a value of type std::ops::FnOnce() +
std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
statically determined
--> src/lib.rs:63:17
|
63 | (*job)();
| ^^^^^^
```
这个错误非常的神秘,因为这个问题本身就很神秘。为了调用储存在 `Box<T>` (这正是 `Job` 别名的类型)中的 `FnOnce` 闭包,该闭包需要能将自己移动 **出** `Box<T>` ,因为当调用这个闭包时,它获取 `self` 的所有权。通常来说,将值移动出 `Box<T>` 是不被允许的,因为 Rust 不知道 `Box<T>` 中的值将会有多大;回忆第十五章能够正常使用 `Box<T>` 是因为我们将未知大小的值储存进 `Box<T>` 从而得到已知大小的值。
第十七章曾见过,示例 17-15 中有使用了 `self: Box<Self>` 语法的方法,它允许方法获取储存在 `Box<T>` 中的 `Self` 值的所有权。这正是我们希望做的,然而不幸的是 Rust 不允许我们这么做: Rust 当闭包被调用时行为的那部分并没有使用 `self: Box<Self>` 实现。所以这里 Rust 也不知道它可以使用 `self: Box<Self>` 来获取闭包的所有权并将闭包移动出 `Box<T>` 。
Rust 仍在努力改进提升编译器的过程中,不过将来示例 20-20 中的代码应该能够正常工作。有很多像你一样的人正在修复这个以及其他问题!当你结束了本书的阅读,我们希望看到你也成为他们中的一员。
不过目前让我们通过一个小技巧来绕过这个问题。可以显式的告诉 Rust 在这里我们可以使用 `self: Box<Self>` 来获取 `Box<T>` 中值的所有权,而一旦获取了闭包的所有权就可以调用它了。这涉及到定义一个新 trait, 它带有一个在签名中使用 `self: Box<Self>` 的方法 `call_box` ,为任何实现了 `FnOnce()` 的类型定义这个 trait, 修改类型别名来使用这个新 trait, 并修改 `Worker` 使用 `call_box` 方法。这些修改如示例 20-21 所示:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore
trait FnBox {
fn call_box(self: Box< Self > );
}
impl< F: FnOnce ( ) > FnBox for F {
fn call_box(self: Box< F > ) {
(*self)()
}
}
type Job = Box< FnBox + Send + ' static > ;
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc< Mutex < mpsc::Receiver < Job > >>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
```
< span class = "caption" > 示例 20-21: 新增一个 trait `FnBox` 来绕过当前 `Box<FnOnce()>` 的限制</ span >
首先,新建了一个叫做 `FnBox` 的 trait。这个 trait 有一个方法 `call_box` ,它类似于其他 `Fn*` trait 中的 `call` 方法,除了它获取 `self: Box<Self>` 以便获取 `self` 的所有权并将值从 `Box<T>` 中移动出来。
接下来,为任何实现了 `FnOnce()` trait 的类型 `F` 实现 `FnBox` trait。这实际上意味着任何 `FnOnce()` 闭包都可以使用 `call_box` 方法。`call_box` 的实现使用 `(*self)()` 将闭包移动出 `Box<T>` 并调用此闭包。
现在我们需要 `Job` 类型别名是任何实现了新 trait `FnBox` 的 `Box` 。这允许我们在得到 `Job` 值时使用 `Worker` 中的 `call_box` 。为任何 `FnOnce()` 闭包都实现了 `FnBox` trait 意味着无需对实际在通道中发出的值做任何修改。
最后,对于 `Worker::new` 的线程中所运行的闭包,调用 `call_box` 而不是直接执行闭包。现在 Rust 就能够理解我们的行为是正确的了。
这是非常狡猾且复杂的手段。无需过分担心他们并不是非常有道理;总有一天,这一切将是毫无必要的。
通过这个技巧,线程池处于可以运行的状态了!执行 `cargo run` 并发起一些请求:
```text
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never used: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec< Worker > ,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `id`
--> src/lib.rs:61:5
|
61 | id: usize,
| ^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `thread`
--> src/lib.rs:62:5
|
62 | thread: thread::JoinHandle< ()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
```
成功了!现在我们有了一个可以异步执行连接的线程池!它绝不会创建超过四个线程,所以当 server 收到大量请求时系统也不会负担过重。如果请求 `/sleep` , server 也能够通过另外一个线程处理其他请求。
在学习了第十八章的 `while let` 循环之后,你可能会好奇为何不能如此编写 worker 线程:
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc< Mutex < mpsc::Receiver < Job > >>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {} got a job; executing.", id);
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
```
< span class = "caption" > 示例 20-22: 一个使用 `while let` 的 `Worker::new` 替代实现</ span >
这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。如此的原因有些微妙:`Mutex` 结构体没有公有 `unlock` 方法,因为锁的所有权依赖 `lock` 方法返回的 `LockResult<MutexGuard<T>>` 中 `MutexGuard<T>` 的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 `Mutex` 守护的资源,不过如果没有认真的思考 `MutexGuard<T>` 的生命周期的话,也可能会导致比预期更久的持有锁。因为 `while` 表达式中的值在整个块一直处于作用域中,`job.call_box()` 调用的过程中其仍然持有锁,这意味着其他 worker 不能接收任务。
相反通过使用 `loop` 并在循环块之内而不是之外获取锁和任务,`lock` 方法返回的 `MutexGuard` 在 `let job` 语句结束之后立刻就被丢弃了。这确保了 `recv` 调用过程中持有锁,而在 `job.call_box()` 调用前锁就被释放了,这就允许并发处理多个请求了。