共享状态的并发

ch16-03-shared-state.md
commit 856d89c53a6d69470bb5669c773fdfe6aab6fcc9

消息传递是一个很好的处理并发的方式,但并不是唯一一个。另一种方式是让多个线程访问同一块内存中的数据(共享状态)。再考虑一下 Go 语言文档中的这句口号:“不要通过共享内存来通讯”(“do not communicate by sharing memory.”

通过共享内存进行通信,会是什么样的代码?此外,为什么喜欢消息传递的人会警告:谨慎使用内存共享?

在某种程度上,任何编程语言中的信道都类似于单所有权,因为一旦将一个值传送到信道中,将无法再使用这个值。共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。在 15 章中,我们介绍了智能指针可以实现多所有权,然而这会增加额外的复杂性,因为需要管理多个所有者。Rust 的类型系统和所有权规则在正确管理这些问题上提供了极大的帮助:举个例子,让我们来看看 互斥器,一个较常见的共享内存并发原语。

使用互斥器,实现同一时刻只允许一个线程访问数据

互斥器mutex)是 互相排斥(mutual exclusion)的缩写。在同一时刻,其只允许一个线程对数据拥有访问权。为了访问互斥器中的数据,线程首先需要通过获取互斥器的 lock)来表明其希望访问数据。锁是一个数据结构,作为互斥器的一部分,它记录谁有数据的专属访问权。因此我们讲,互斥器通过锁系统 保护guarding)其数据。

互斥器以难以使用著称(译注:原文指互斥器在其他编程语言中难以使用),因为你必须记住:

  1. 在使用数据之前,必须获取锁。
  2. 使用完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁。

作为一个现实中互斥器的例子,想象一下在某个会议的一次小组座谈会中,只有一个麦克风。如果一位成员要发言,他必须请求或表示希望使用麦克风。得到了麦克风后,他可以畅所欲言,讲完后再将麦克风交给下一位希望讲话的成员。如果一位成员结束发言后忘记将麦克风交还,其他人将无法发言。如果对共享麦克风的管理出现了问题,座谈会将无法正常进行!

正确的管理互斥器异常复杂,这也是许多人之所以热衷于信道的原因。然而,在 Rust 中,得益于类型系统和所有权,我们不会在锁和解锁上出错。

Mutex<T>的 API

我们先从在单线程环境中使用互斥器开始,作为展示其用法的一个例子,如示例 16-12 所示:

文件名:src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {m:?}");
}

示例 16-12: 出于简单的考虑,在一个单线程上下文中探索 Mutex<T> 的 API

像很多类型一样,我们使用关联函数 new 来创建一个 Mutex<T>。使用 lock 方法来获取锁,从而可以访问互斥器中的数据。这个调用会阻塞当前线程,直到我们拥有锁为止。

如果另一个线程拥有锁,并且那个线程 panic 了,则 lock 调用会失败。在这种情况下,没人能够再获取锁,所以我们调用 unwrap,使当前线程 panic。

一旦获取了锁,就可以将返回值(命名为 num)视为一个其内部数据(i32)的可变引用了。类型系统确保了我们在使用 m 中的值之前获取锁。m 的类型是 Mutex<i32> 而不是 i32,所以 必须 获取锁才能使用这个 i32 值。我们是不会忘记这么做的,因为如果没有获取锁,类型系统就不允许访问内部的 i32 值。

正如你所猜想的,Mutex<T> 是一个智能指针。更准确的说,lock 调用 返回 一个叫做 MutexGuard 的智能指针。这个智能指针实现了 Deref 来指向其内部数据;它也实现了 Drop,当 MutexGuard 离开作用域时,自动释放锁(发生在示例 16-12 内部作用域的结尾)。有了这个特性,就不会有忘记释放锁的潜在风险(忘记释放锁会使互斥器无法再被其它线程使用),因为锁的释放是自动发生的。

释放锁之后,我们可以打印出互斥器内部的 i32 值,并发现我们刚刚已经将其值改为 6。

在线程间共享 Mutex<T>

现在让我们尝试使用 Mutex<T> 在多个线程间共享同一个值。我们将启动 10 个线程,并在各个线程中对同一个计数器值加 1,这样计数器将从 0 变为 10。示例 16-13 中的例子会出现编译错误,而我们将通过这些错误来学习如何使用 Mutex<T>,以及 Rust 又是如何帮助我们正确使用的。

文件名:src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

示例 16-13: 程序启动了 10 个线程,每个线程都通过 Mutex<T> 来增加计数器的值

这里创建了一个 counter 变量来存放内含 i32Mutex<T>,类似示例 16-12 那样。接下来我们遍历整数区间,创建了 10 个线程。我们使用了 thread::spawn,并为所有线程传入了相同的闭包:它们每一个都将调用 lock 方法来获取 Mutex<T> 上的锁,接着将互斥器中的值加一。当一个线程结束执行,num 会离开闭包作用域并释放锁,这样另一个线程就可以获取它了。

在主线程中,我们像示例 16-2 那样收集了所有的 JoinHandle,并调用它们的 join 方法来等待所有线程结束。然后,主线程会获取锁,并打印出程序的结果。

之前提示过,这个例子不能编译,让我们看看为什么!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8  |     for _ in 0..10 {
   |     -------------- inside of this loop
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |
help: consider moving the expression out of the loop so it is only moved once
   |
8  ~     let mut value = counter.lock();
9  ~     for _ in 0..10 {
10 |         let handle = thread::spawn(move || {
11 ~             let mut num = value.unwrap();
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

错误信息表明 counter 值在上一次循环中被移动了。所以 Rust 告诉我们,不能将 counter 锁的所有权移动到多个线程中。让我们通过一个第 15 章讨论过的多所有权手段,来修复这个编译错误。

多线程和多所有权

在第 15 章中,我们用智能指针 Rc<T> 来创建引用计数,使得一个值有了多个所有者。让我们做同样的事,看看会发生什么。将示例 16-14 中的 Mutex<T> 封装进 Rc<T> 中,并在将所有权移入线程之前克隆(clone) Rc<T>

文件名:src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

示例 16-14: 尝试使用 Rc<T> 来允许多个线程拥有 Mutex<T>

再一次编译并...出现了不同的错误!编译器真是教会了我们很多!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |
   | |                      required by a bound introduced by this call
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`, which is required by `{closure@src/main.rs:11:36: 11:43}: Send`
note: required because it's used within this closure
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/eeb90cda1969383f56a2637cbd3037bdf598841c/library/std/src/thread/mod.rs:688:1

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

哇哦,错误信息太长不看!划重点:第一行错误表明 Rc<Mutex<i32>> 不能在线程间安全传递(`Rc<Mutex<i32>>` cannot be sent between threads safely);编译器也指出了原因:Rc<Mutex<i32>> 没有实现 Send trait(the trait `Send` is not implemented for `Rc<Mutex<i32>>`)。下一节我们会讲到 Send:这是一个确保所使用的类型可以用于并发环境的 trait。

不幸的是,Rc<T> 并不能安全的在线程间共享。当 Rc<T> 管理引用计数时,它必须在每一个 clone 调用时增加计数,并在每一个克隆体被丢弃时减少计数。Rc<T> 并没有使用任何并发原语,无法确保改变计数的操作不会被其他线程打断。这可能使计数出错,并导致诡异的 bug,比如可能会造成内存泄漏,或在使用结束之前就丢弃一个值。我们所需要的是一个与 Rc<T> 完全一致,又以线程安全的方式改变引用计数的类型。

原子引用计数 Arc<T>

所幸 Arc<T> 正是这么一个类似 Rc<T> 并可以安全的用于并发环境的类型。字母 “a” 代表 原子性atomic),所以这是一个 原子引用计数atomically reference counted)类型。原子类型 (Atomics) 是另一类这里还未涉及到的并发原语:请查看标准库中 std::sync::atomic 的文档来获取更多细节。目前我们只需要知道:原子类型就像基本类型一样,可以安全的在线程间共享。

你可能会好奇,为什么不是所有的基本类型都是原子性的?为什么标准库中的类型没有全部默认使用 Arc<T> 实现?原因在于,线程安全会造成性能损失,我们希望只在必要时才为此买单。如果只是在单线程中对值进行操作,原子性提供的保证并无必要,而不加入原子性可以使代码运行得更快。

回到之前的例子:Arc<T>Rc<T> 有着相同的 API,所以我们只需修改程序中的 use 行、new 调用和 clone 调用。示例 16-15 中的代码最终可以编译和运行:

文件名:src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

示例 16-15: 使用 Arc<T> 包装一个 Mutex<T> 能够实现在多线程之间共享所有权

这会打印出:

Result: 10

成功了!我们从 0 数到了 10,这好像没啥大不了的,不过一路上我们确实学习了很多关于 Mutex<T> 和线程安全的内容!这个例子中构建的结构可以用于比增加计数更为复杂的操作。使用这个策略,我们可将计算任务分成独立的部分,并分散到多个线程中,接着使用 Mutex<T> 使用各自的运算结果来更新最终的结果。

注意,对于简单的数值运算,标准库中 std::sync::atomic 模块 提供了比 Mutex<T> 更简单的类型。针对基本类型,这些类型提供了安全、并发、原子的操作。在上面的例子中,为了专注于讲明白 Mutex<T> 的用法,我们才选择在基本类型上使用 Mutex<T>。(译注:对于上面例子中出现的 i32 加法操作,更好的做法是使用 AtomicI32 类型来完成。具体参考文档。)

RefCell<T>/Rc<T>Mutex<T>/Arc<T> 的相似性

你可能注意到了,尽管 counter 是不可变的,我们仍然可以获取其内部值的可变引用;这意味着 Mutex<T> 提供了内部可变性,就像 Cell 系列类型那样。使用 RefCell<T> 可以改变 Rc<T> 中内容(在 15 章中讲到过),同样地,使用 Mutex<T> 我们也可以改变 Arc<T> 中的内容。

另一个值得注意的细节是,Rust 不能完全避免使用 Mutex<T> 所带来的逻辑错误。回忆一下,第 15 章中讲过,使用 Rc<T> 就有造成引用循环的风险:两个 Rc<T> 值相互引用,造成内存泄漏。同理,Mutex<T> 也有造成 死锁deadlock)的风险:当某个操作需要锁住两个资源,而两个线程分别持有两个资源的其中一个锁时,它们会永远相互等待。如果你对这个话题感兴趣,尝试编写一个带有死锁的 Rust 程序,接着研究别的语言中使用互斥器的死锁规避策略,并尝试在 Rust 中实现它们。标准库中 Mutex<T>MutexGuard 的 API 文档会提供有用的信息。

接下来,为了丰富本章的内容,让我们讨论一下 SendSync trait,以及如何对自定义类型使用它们。