diff --git a/10_Ideas_and_Inspiration.md b/10_Ideas_and_Inspiration.md index b3b9da6..862b7c8 100644 --- a/10_Ideas_and_Inspiration.md +++ b/10_Ideas_and_Inspiration.md @@ -84,11 +84,11 @@ Windows SRW 锁([第8章中的“一个轻巧的读写锁”](./8_Operating_Sy * [关于 Windows SRW 锁的实现](https://oreil.ly/El8GA) * [基于队列的锁的 Rust 实现](https://oreil.ly/aFyg1) -## 基于 Parking 的锁 +## 基于阻塞的锁 为了创建一个尽可能小的高效 mutex,你可以通过将队列移动到全局的数据结构,在 mutex 自身只留下 1 或者 2 个位,来构建基于队列的锁的想法。这样,mutex 仅需要是一个字节。你甚至可以把它放置在一些未使用的指针位中,这允许非常细粒度的锁定,几乎没有其他额外的开销。 -全局的数据结构可以是一个 `HashMap`,将内存地址映射到等待该地址的 mutex 的线程队列。全局的数据结构通常叫做 `parking lot`,因为它是一组被 `park` 的线程合集。 +全局的数据结构可以是一个 `HashMap`,将内存地址映射到等待该地址的 mutex 的线程队列。全局的数据结构通常叫做 `parking lot`,因为它是一组被阻塞(`park`)的线程合集。 ![ ](./picture/raal_10in05.png) diff --git a/1_Basic_of_Rust_Concurrency.md b/1_Basic_of_Rust_Concurrency.md index 92f7ea3..72cc606 100644 --- a/1_Basic_of_Rust_Concurrency.md +++ b/1_Basic_of_Rust_Concurrency.md @@ -1,45 +1,888 @@ # 第一章:Rust 并发基础 +早在多核处理器司空见惯之前,操作系统就允许一台计算机运行多个程序。这是通过在进程之间快速切换来完成的,允许每个进程逐个地重重地取得一点进展。现在,几乎所有的电脑,甚至手机和手表都有着多核处理器,可以真正并行执行多个程序。 + +操作系统尽可能的将进程之间隔离,允许程序完全意识不到其他线程在做什么的情况下做自己的事情。例如,在不先询问操作系统内核的情况下,一个进程通常不能获取其他进程的内存,或者以任意方式与之通信。 + +然而,一个程序可以产生额外的*执行线程*作为*进程*的一部分。同一进程中的线程不会相互隔离。线程共享内存并且可以通过内存相互交互。 + +这一章节将阐述在 Rust 中如何产生线程,并且关于它们的所有基本概念,例如如何安全地在多个线程之间共享数据。本章中解释的概念是本书其余部分的基础。 + +> 如果你已经熟悉 Rust 中的这些部分,你可以随时跳过。然而,在你继续下一章节之前,请确保你对线程、内部可变性、Send 和 Sync 有一个好的理解,以及知道什么是互斥锁[^2]、条件变量[^1]以及线程阻塞(park)[^3]。 + ## Rust 中的线程 +每个程序都从一个线程开始:主(main)线程。该线程将执行你的 main 函数,并且如果需要,它可以用于产生更多线程。 + +在 Rust 中,新线程使用来自标准库的 `std::thread::spawn` 函数产生。它接受一个参数:新线程执行的函数。一旦该函数停止,将立刻返回。 + +让我们看一个示例: + +```rust +use std::thread; + +fn main() { + thread::spawn(f); + thread::spawn(f); + + println!("Hello from the main thread."); +} + +fn f() { + println!("Hello from another thread!"); + + let id = thread::current().id(); + println!("This is my thread id: {id:?}"); +} +``` + +我们产生两个线程,它们都将执行 f 作为它们的主函数。这两个线程将输出一个信息并且展示它们的*线程 id*,主线程也将输出它自己的信息。 + +
+

Thread ID

+ Rust 标准库位每一个线程分配一个唯一的标识符。此标识符可以通过 Thread::id() 访问并且拥有 ThreadId 类型。除了复制 ThreadId 以及检查它们相等外,你也做不了什么。不能保证这些 ID 将会连续分配,只是每个线程都会有所不同。 +
+ +如果你运行我们上面的示例几次,你可能注意到输出在运行之间有所不同。一次特定的运行在机器上的输出: + +```txt +Hello from the main thread. +Hello from another thread! +This is my thread id: +``` + +惊讶地是,部分输出似乎失去了。 + +这里发生的情况是:新的线程结束执行它们的函数之前,主线程结束执行了主函数。 + +从主函数返回将退出整个程序,即使所有线程仍然在运行。 + +在这个特定的示例中,在程序被主线程关闭之前,其中一个新的线程有足够的消息到达第二条消息的一半。 + +如果我们想要在主函数返回之前,确保线程结束,我们可以通过 `join` 它们来等待。未来这样做,我们在 `spawn` 函数返回后使用 `JoinHandle`: + +```rust +fn main() { + let t1 = thread::spawn(f); + let t2 = thread::spawn(f); + + println!("Hello from the main thread."); + + t1.join().unwrap(); + t2.join().unwrap(); +} +``` + +`.join()` 方法等待直到线程结束执行并且返回 `std::thread::Result`。如果线程由于 panic 不能成功地完成它的函数,这将包含 panic 消息。我们试图去处理这种情况,或者在 join panic 线程仅调用 `.unwrap()` 去 panic。 + +运行我们程序的这个版本,将不再导致截断的输出: + +```txt +Hello from the main thread. +Hello from another thread! +This is my thread id: ThreadId(3) +Hello from another thread! +This is my thread id: ThreadId(2) +``` + +唯一仍然改变的是消息的打印顺序: + +```txt +Hello from the main thread. +Hello from another thread! +Hello from another thread! +This is my thread id: ThreadId(2) +This is my thread id: ThreadId(3) +``` + +
+

输出锁定

+ println 宏使用 std::io::Stdout::lock() 去确保输出没有被中断。println!() 将等待直到任意并发地运行完成后,在写入输出。如果不是这样,我们可以得到更多的交叉输出: + +

+ Hello fromHello from another thread! + another This is my threthreadHello fromthread id: ThreadId! + ( the main thread. + 2)This is my thread + id: ThreadId(3) +

+
+ +与其将函数的名称传递给 `std::thread::spawn`,不如像我们上面的示例那样,传递一个*闭包*。这允许我们捕获值移动到新的线程: + +```rust +let numbers = vec![1, 2, 3]; + +thread::spawn(move || { + for n in &numbers { + println!("{n}"); + } +}).join().unwrap(); +``` + +在这里,numbers 的所有权被转移到新产生的线程,因为我们使用了 `move` 闭包。如果我们没有使用 `move` 关键字,闭包将会通过引用补货 numbers。这将导致一个编译器错误,因为新的线程超出变量的生命周期。 + +由于线程可能运行直到程序执行结束,因此产生的线程在它的参数类型上有 `'static` 生命周期绑定。换句话说,它只接受永久保留的函数。闭包通过引用捕获局部变量不能够永久保留,因为当局部变量不存在时,引用将变得无效。 + +从线程中取回一个值,是从闭包中返回完成的。该返回值通过 `join` 方法返回的 `Result` 中获取: + +```rust +let numbers = Vec::from_iter(0..=1000); + +let t = thread::spawn(move || { + let len = numbers.len(); + let sum = numbers.iter().sum::(); + sum / len // 1 +}); + +let average = t.join().unwrap(); // 2 + +println!("average: {average}"); +``` + +在这里,线程闭包(1)返回的值通过 `join` 方法发送回主线程。 + +如果 numbers 是空的,当它尝试去除以 0 时(2),线程将发生 panic,而 `join` 将会发生 panic 消息,将由于 `unwarp` 导致主线程也 panic。 + +
+

Thread Builder

+ std::thread::spawn 函数事实上仅是 std::thread::Builder::new().spawn().unwrap() 的快捷缩写。 + + std::thread::Builder 允许你在产生线程之前为新线程设置一些设置。你可以使用它为新线程配置栈大小并给新线程一个名字。线程的名字是可以通过 std::thread::current().name() 获得,这将在 panic 消息中可用,并在监控和大多数雕饰工具中可见。 + + 此外,Builder 的产生函数返回一个 std::io::Result,允许你处理新线程失败的情况。如果操作系统内存不足,或者资源限制已经应用于你对程序,这是可能发生的。如果 std::thread::spawn 函数不能去产生一个新线程,它只会 panic。 +
+ ## 线程作用域 +如果我们确信生成的线程不会比某个范围存活更久,那么线程可以安全地借用哪些不会一直存在的东西,例如局部变量,只要它们比该范围活得更久。 + +Rust 标准库提供了 `std::thread::scope` 去产生此类*线程作用域*。它允许我们产生不超过我们传递给该函数闭包的范围的线程,这使它可能安全地借用局部变量。 + +它的工作原理最好使用一个示例来展示: + +```rust +let numbers = vec![1, 2, 3]; + +thread::scope(|s| { // 1 + s.spawn(|| { // 2 + println!("length: {}", numbers.len()); + }); + s.spawn(|| { // 2 + for n in &numbers { + println!("{n}"); + } + }); +}); // 3 +``` + +1. 我们使用闭包调用 `std::thread::scope` 函数。我们的闭包是直接执行,并得到一个参数,`s`,表示作用域。 +2. 我们使用 `s` 去产生线程。该闭包可以借用本地变量,例如 numbers。 +3. 当作用域结束,所有仍没有 join 的线程都会自动 join。 + +这种模式保证了,在作用域产生的线程没有会比作用域更长的生命周期。因此,作用域中的 `spawn` 方法在它的参数类型中没有一个 `'static` 约束,允许我们去引用任何东西,只要它比作用域有更长的生命周期,例如 numbers。 + +在以上示例中,这两个线程并发地获取 numbers。这是没问题的,因为它们其中的任何一个(或者主线程)都没有修改它。如果我们改变第一个线程去修改 numbers,正如下面展示的,编译器将不允许我们也产生另一个也使用数字的线程: + +```rust +let mut numbers = vec![1, 2, 3]; + +thread::scope(|s| { + s.spawn(|| { + numbers.push(1); + }); + s.spawn(|| { + numbers.push(2); // Error! + }); +}); +``` + +确切的错误信息以待遇 Rust 编译器版本,因为它经常被改进以提升更好的判断,但是试图去编译以上代码将导致一下问题: + +```txt +error[E0499]: cannot borrow `numbers` as mutable more than once at a time + --> example.rs:7:13 + | +4 | s.spawn(|| { + | -- first mutable borrow occurs here +5 | numbers.push(1); + | ------- first borrow occurs due to use of `numbers` in closure + | +7 | s.spawn(|| { + | ^^ second mutable borrow occurs here +8 | numbers.push(2); + | ------- second borrow occurs due to use of `numbers` in closure +``` + +
+

泄漏启示录

+ 在 Rust 1.0 之前,标准库有一个函数叫做 std::thread::scoped,它将直接产生一个线程,就像 std::thread::spawn。它允许无 'static 的捕获,因为它返回的不是 JoinGuard,而是当被 drop 时 join 到线程的 JoinGuard。任意的借用数据仅需要比这个 JoinGuard 活得更久。只要 JoinGuard 在某个时候被 drop,这似乎是安全的。 + + 就在 Rust 1.0 发布之前,人们慢慢发现它似乎不能保证某些东西被 drop。有很多种方式没有 drop 它,例如创建一个引用计数节点的循环,可以忘记某些东西或者*泄漏*它。 + + 最终,在一些人提及的“泄漏启示录”中得到结论,(安全)接口的设计不能依赖假设对象总是在它们的生命周期结束后 drop。泄漏一个对象可能会导致泄漏更多对象(例如,泄漏一个 Vec 将也导致泄漏它的元素),但它并不会导致未定义行为(undefind behavior)[^6]。因此,std::thread::scoped 将不再视为安全的并从标准库移除。此外,std::mem::forget 从一个不安全的函数升级到*安全*的函数,以强调忘记(或泄漏)总是一种可能性。 + + 直到后来,在 Rust 1.63 中,添加了一个新的 std::thread::scope 功能,其新设计不依赖 Drop 来获得正确性。 +
+ ## 共享所有权以及引用计数 +目前,我们已经使用了 `move` 闭包([“Rust 中的线程”](#rust-中的线程))将值的所有权转移到线程并从生命周期较长的父线程借用数据([“线程作用域”](#线程作用域))。当两个线程之间共享数据,它们之间的任何一个线程都不能保证比另一个线程的生命周期长,那么它们都不能称为该数据的所有者。它们之间共享的任何数据都需要与最长生命周期的线程一样长。 + ### Static +有几种方式去创建不属于单线程的东西。最简单的方式是**静态**值,它由整个程序“拥有”,而不是单个线程。在以下示例中,这两个线程都可以获取 X,但是它们并不能拥有它: + +```rust +static X: [i32; 3] = [1, 2, 3]; + +thread::spawn(|| dbg!(&X)); +thread::spawn(|| dbg!(&X)); +``` + +静态项一半由一个常量初始化,它从不会被 drop,并且甚至在程序的主线程开始之前就已经存在。每个线程都可以借用它,因为它保证它总是存在。 + ### 泄漏(Leak) +另一种方式是通过*泄漏*分配的方式共享所有权。使用 `Box::leak`,人们可以释放 `Box` 的所有权,保证永远不会 drop 它。从那时起,`Box` 将永远存在,没有所有者,只要程序运行,任意线程都可以借用它。 + +```rust +let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3])); + +thread::spawn(move || dbg!(x)); +thread::spawn(move || dbg!(x)); +``` + +`move` 闭包可能会让它看起来像我们移动所有权进入线程,但仔细观察 x 的类型就会发现,我们只是给线程一个对数据的*引用*。 + +> 引用是 `Copy` 的,这意味着当你“move”它们的时候,原始内容仍然存在,这就像整数或者布尔内容一样。 + +注意,`'static` 生命周期并不意味着该值自程序开始时就存在,而只是意味着它一直存在到程序的结束。过去并不重要。 + +泄漏 `Box` 的缺点是我们正在泄漏内存。我们分配一些东西,但是从未 drop 和取消分配它。如果仅发生有限的次数,这就可以了。但是如果我们继续这样做,程序将慢慢地耗尽内存。 + ### 引用计数 +为了确保共享数据能够 drop 和取消分配,我们不能完全放弃它的所有权。相反,我们可以*分享所有权*。通过跟踪所有者的数量,我们确保仅当没有所有者时,才会丢弃该值。 + +Rust 标准库通过 `std::rc::Rc` 类型提供了该功能,它是“引用计数”(reference counted)的缩写。它与 `Box` 非常类似,唯一的区别是克隆它将不会分配任何新内存,而是增加存储在包含值旁边的计数器。原始的 `Rc` 和克隆的 `Rc` 将引用相同的内存分配;它们*共享所有权*。 + +```rust +use std::rc::Rc; + +let a = Rc::new([1, 2, 3]); +let b = a.clone(); + +assert_eq!(a.as_ptr(), b.as_ptr()); // Same allocation! +``` + +drop 一个 `Rc` 将减少计数。只有最后一个 `Rc`,计数器下降到 0,才会是 drop 和取消分配所包含的数据。 + +如果我们尝试去发送一个 Rc 到另一个线程,然而,我们将造成以下的编译错误: + +```txt +error[E0277]: `Rc` cannot be sent between threads safely + | +8 | thread::spawn(move || dbg!(b)); + | ^^^^^^^^^^^^^^^ +``` + +事实证明,`Rc` 不是*线程安全*的(详见,[线程安全:Send 和 Sync](#线程安全send-和-sync))。如果多个线程有相同分配的 `Rc`,那么它们可能尝试同时修改引用计数,这可能产生不可预测的结果。 + +然而,我们可以使用 `std::sync::Arc`,它代表“原子引用计数”。它与 `Rc` 相同,只是它保证了对引用计数的修改时不可分割的*原子*操作,因此可以安全地与多个线程使用。(详见第二章。) + +```rust +use std::sync::Arc; + +let a = Arc::new([1, 2, 3]); // 1 +let b = a.clone(); // 2 + +thread::spawn(move || dbg!(a)); // 3 +thread::spawn(move || dbg!(b)); // 3 +``` + +1. 我们在新的分配中放置了一个一个数组,以及从一开始的引用计数器。 +2. 克隆 Arc 增加引用计数到两个,并为我们提供相同的分配到第二个 Arc。 +3. 两个 Arc 都获取到自己的 Arc,通过 Arc 它们可以获取共享数组。当它们 drop 它们的 Arc,两者都会减少引用计数。最后一个 drop 它的 Arc 的线程将看见计数器减少到 0,并且将是 drop 和取消分配数组的线程。 + +
+

命名克隆

+ 不得不给每个 Arc 的克隆取一个不同的名称,这可能使得代码变得混乱难以追踪。尽管每个 Arc 的克隆都是一个独立的对象,而通过每个克隆赋予不同的名称并不能很好地反映这一点。 + + Rust 允许(并且鼓励)你通过定义有着新的名称的相同变量去*遮蔽*变量。如果你在相同作用域这么做,则无法再命名原始变量。但是通过打开一个新的作用域,可以使用类似 let a = a.clone(); 的语句在该作用域内重用相同的名称,同时在作用于外保留原始变量的可用性。 + + 通过在新的作用域(使用 {})中封装闭包,我们可以在将变量移动到闭包中之前,进行克隆,并不要重新命名它们。 +
+
+
+let a = Arc::new([1, 2, 3]);
+let b = a.clone();
+thread::spawn(move || {
+   dbg!(b);
+});
+dbg!(a);
+      
+ Arc 克隆存活在相同作用域范围内。每个线程都有自己的克隆,只是名称不同。 +
+
+
+let a = Arc::new([1, 2, 3]);
+thread::spawn({
+    let a = a.clone();
+    move || {
+        dbg!(a);
+    }
+});
+dbg!(a);
+      
+ Arc 的克隆存活在不同的生命周期范围内。我们可以在每个线程使用相同的名称。 +
+
+
+ +因为所有权是共享的,引用计数指针(`Rc` 和 `Arc`)与共享引用(`&T`)有着相同的限制。它们并不能让你对它们包含的值进行可变访问,因为该值在同一时间,可能被其它代码借用。 + +例如,如果我们尝试去排序 `Arc<[i32]>` 中整数的切片,编译器将阻止我们这么做,告诉我们不允许改变数据: + +```txt +error[E0596]: cannot borrow data in an `Arc` as mutable + | +6 | a.sort(); + | ^^^^^^^^ +``` + ## 借用和数据竞争 +在 Rust 中,可以使用两种方式借用值。 + +* *不可变借用* + * 使用 `&` 借用会得到一个*不可变借用*。这样的引用可以被复制。对于它引用数据的访问在所有引用副本之间是共享的。顾名思义,编译器通常不允许你通过这样的引用改变数据,因为那可能会影响当前借用相同数据的其它代码。 +* *可变借用* + * 使用 `&mut` 借用会得到一个*可变借用*。可变借用保证了它是该数据的唯一激活的借用。这确保了可变的数据将不会改变任何其它代码正在查看的数据。 + +这两个概念一起,完全阻止了*数据竞争*:一个线程正在改变数据,而另一个线程正在并发地访问数据的情况。数据竞争通常是*未定义行为*,这意味着编译器不需要考虑这些情况。它只是假设它们并不会发生。 + +为了清晰地表达这个意思,让我们来看一看编译器可以使用借用规则作出有用假设的示例: + +```rust +fn f(a: &i32, b: &mut i32) { + let before = *a; + *b += 1; + let after = *a; + if before != after { + x(); // never happens + } +} +``` + +这里,我们得到一个整数的不可变引用,并在增加 b 所引用的整数进行递增操作之前和之后存储整数的值。编译器可以自由地假设关于借用和数据竞争的基本规则得到了遵守,这意味着 b 不可能引用与 a 相同的整数。实际上,在对 a 进行借用时,整个程序中没有任何地方对 a 借用的整数进行可变借用。因此,编译器可以轻松地推断 `*a` 不会发生变化,并且 `if` 语句将永远不是 true,并且可以作为优化完全地删除 x 调用。 + +除了使用一个不安全的块(`unsafe`)去禁止一些编译器安全地检查,否则不可能去写 Rust 程序打断编译器的假设。 + +
+

未定义行为

+ 类似 C、C++ 和 Rust 都有一套需要遵守的规则,以避免未定义行为。例如,Rust 的规则之一是,对任何对象的可变引用永远不可能超过一个。 + + 在 Rust 中,仅当使用 unsafe 代码块才能打破这些规则。“unsafe”并不意味着代码是错误的或者错位安全使用,而是编译器并没有为你验证代码是安全的。如果代码却是违法了这些规则,则称为不健全的(unsound)。 + + 允许编译器在不检查的情况下假设这些规则从未破坏。当破坏是,这将导致叫做为定义行为的问题,我们需要不惜一切代价去避免。如果我们允许编译器作出与实际不符的假设,那么它可能很容易导致关于代码不同部分更错误的结论,影响你整个程序。 + + 作为一个具体的例子,让我们看看在切片上使用 get_unchecked 方法的小片段: + +
+let a = [123, 456, 789];
+let b = unsafe { a.get_unchecked(index) };
+  
+ get_unchecked 方法给我们一个给定索引的切片元素,就像 a[index],但是允许变异器假设索引总是在边界,没有任何检查。 + + 这意味着,在代码片段中,由于 a 的长度是 3,编译器可能假设索引小雨 3。这使我们确保其假设成立。 + + 如果我们破坏了这个假设,例如,我们以等于 3 的索引运行,任何事情都可能发生。它可能导致读取 `a` 之后存储的任何内存内容。这可能导致程序崩溃。它可能会执行程序中完全无关的部分。它可能会引起各种糟糕的情况。 + + 或许令人惊讶的是,为定义行为可能“回到过去”,导致之前的代码出问题。要理解这种情况是如何发生的,想象我们上面的片段有一个 match 语句,如下: + +
+match index {
+   0 => x(),
+   1 => y(),
+   _ => z(index),
+}
+
+let a = [123, 456, 789];
+let b = unsafe { a.get_unchecked(index) };
+  
+ 由于不安全的代码,允许编译器假设索引只有 0、1 或 2。从逻辑上讲,我们的 match 语句的最后分支仅会匹配到 2,因此 z 仅会调用为 z(2)。这个结论不仅可以优化匹配,还可以优化 z 本身。这可以扔掉代码中未使用的部分。 + + 如果我们以 3 的索引执行此设置,我们的程序试图去执行已优化的部分,导致完全地为定义行为,这还远在我们到达最后一行不安全的块之前。就像这样,未定义行为通过整个程序向后或者向前传播,通过非常意想不到的方式传播。 + + 当调用任何的不安全函数时,读它的文档并确保你完全理解它的安全需求:作为调用者,你需要坚持的假设,以避免未定义行为。 +
+ ## 内部可变性 +上一节介绍的借用规则可能非常有限——尤其涉及多个线程时。遵循这些规则在线程之间通信极其有限,并且是不可能的,因为多个线程访问的数据都无法改变。 + +幸运地是,有一个逃生方式:内部可变性。有着内部可变性的数据类型略微改变了借用规则。在某些情况下,这些类型可以使用“不可变”的引用进行可变。 + +在[“引用计数”](#引用计数)中,我们已经看到一个设计内部可变性的微妙示例。在 `Rc` 和 `Arc` 都变为引用计数器,即使可能有多个克隆都使用相同的引用计数器。 + +一旦设计内部可变性类型,称“不可变”和“可变”将变得混乱和不准确,因为一些类型可以通过两者变得可变。更准确的称呼是“共享”和“独占”:共享引用(`&T`)可以被复制以及与其它引用共享,然而*独占引用*(`&mut T`)保证了仅有一个对 T 的独占借用。对于大多数类型,共享引用并不允许可变,但有一些例外。由于本书我们将主要处理这些例外情况,我们将在这本书的剩余内容中使用更准确的术语。 + +> 请记住,内部可变性仅会影响共享借用的规则,以便在共享时允许可变。它不能改变任意关于独占借用的规则。独占借用仍然保证没有任意激活的借用。导致超过一个活动的独占引用的不安全代码总是涉及未定义行为,不管内部可变性如何。 + +让我们看一看有着内部可变性的一些示例,以及如何通过共享引用允许可变性而不导致未定义行为。 + ### Cell +`std::cell::Cell` 仅是包裹了 T,但允许通过共享引用进行可变。为避免未定义行为,它仅允许你讲值复制出来(如果 T 实现 Copy)或者将其替换为另一个整体值。此外,它仅用于单个线程。 + +让我们看一看与上一节相似的示例,但是这一次使用 `Cell` 而不是 `i32`: + +```rust +use std::cell::Cell; + +fn f(a: &Cell, b: &Cell) { + let before = a.get(); + b.set(b.get() + 1); + let after = a.get(); + if before != after { + x(); // might happen + } +} +``` + +与上次不同,现在 if 条件有可能为真。因为 `Cell` 是内部可变的,只要我们有对它的共享引用,编译器不再假设它的值不再改变。a 和 b 可能引用相同的值,通过 b 也可能影响 a。然而,它可能同时假设没有其它线程同时获取 cell。 + +对 Cell 的限制并不总是容易处理的。因为它不能直接让我们借用它所持有的值,我们需要将值移动出去(让一些东西替换它的位置),修改它,然后将它放回去,以改变它的内容: + +```rust +fn f(v: &Cell>) { + let mut v2 = v.take(); // Replaces the contents of the Cell with an empty Vec + v2.push(1); + v.set(v2); // Put the modified Vec back +} +``` + ### RefCell -### Mutex 和 RwLock +与常规的 Cell 不同的是,`std::cell::RefCell` 与许你以很小的运行时花费去借用它的内容。`RefCell` 不仅持有 T,同时也持跟踪任何未解除的借用。如果你尝试在已经可变借用时尝试借用(或反之亦然),它会引发 panic,以避免出现未定义行为。就像 Cell,RefCell 只能在单个线程中使用。 + +借用 RefCell 的内容通过调用 `borrow` 或者 `borrow_mut` 完成: + +```rust +use std::cell::RefCell; + +fn f(v: &RefCell>) { + v.borrow_mut().push(1); // We can modify the `Vec` directly. +} +``` + +尽管 Cell 和 RefCell 有时是非常有用的,但是当我们使用多线程的时候,它们会变得无用。所以让我们继续讨论与并发相关的类型。 + +### 互斥锁[^4]和读写锁[^5] + +*读写锁*(RwLock)是 `RefCell` 的并发版本。`RwLock` 持有 T 并且跟踪任意未解除的借用。然而,与 RefCell 不同,它在冲突借用中不会 panic。相反,它会阻塞当先线程——使它进入睡眠——直到冲突借用消失才会唤醒。在其它线程完成后,我们仅需要耐心的等待轮到我们处理数据。 + +借用 RwLock 的内容称为*锁*。通过锁定它,我们临时阻塞并发的冲突借用,这允许我们没有导致数据竞争的借用它。 + +`Mutex` 是非常相似的,但是概念上是简单的。它不像 RwLock 跟踪共享借用和独占借用的数量,它仅允许独占借用。 + +我们将在[“锁:互斥锁和读写锁”](#锁互斥锁和读写锁)更详细地介绍这些类型。 ### Atomic +原子类型表示 Cell 的并发版本,是第 [2](./2_Atomics.md) 章和第 [3](./3_Memory_Ordering.md) 章的主题。与 Cell 相同,它们通过将整个值进行复制来避免未定义行为,而不直接让我们借用内容。 + +与 Cell 不同的是,它们不能是任意大小的。因此,任何 T 都没有通用的 `Atomic` 类型,但仅有特定的原子类型,例如 `AtomicU32` 和 `AtomicPtr`。哪些可用取决于平台,因为它们需要处理器的支持来避免数据竞争。(我们将在[第七章](./7_Understanding_the_Processor.md)研究这个问题。) + +因为它们的大小非常有限,原子类型通常不直接在线程之间共享所需的信息。相反,它们通常用作工具,是线程之间共享其它(通常是更大的)东西作为可能。当原子用于表示其它数据时,情况可能变得令人意外地复杂。 + ### UnsafeCell +`UnsafeCell` 是内部可变性的原始构建块。 + +`UnsafeCell` 包裹 T,但是没有附带任何条件和限制来避免未定义行为。相反,它的 `get()` 方法仅是给出了它包装值的原始指针,该值仅可以在 `unsafe` 块中使用。它以用户不会导致任何未定义行为的方式使用它。 + +更常见的是,不会直接使用 UnsafeCell,而是将它包裹在另一个类型,通过限制接口提供安全,例如 `Cell` 和 `Mutex`。所有有着内部可变性的类型——包括所有以上讨论的类型都建立在 UnsafeCell 之上。 + ## 线程安全:Send 和 Sync -## 锁:Mutex 和 RwLock +在这一章节中,我们已经看见一个不是*线程安全*的类型,这些类型仅用于一个单线程,例如 `Rc`、`Cell` 以及其它。由于需要这些限制来避免未定义行为,编译器需要为你理解和检查,因此你可以使用这些类型,而不必使用 unsafe 块。 + +该语言使用两种特殊的 trait 以更总这些类型可以安全地用作交叉线程: + +* *Send* + * 如果可以发送到另一个线程,则是 `Send` 类型。换句话说,是否该类型值的所有权可以转移到另一个线程。例如,`Arc` 是 `Send`,而 `Rc` 不是。 +* *Sync* + * 如果可以共享到另一个线程,则是 `Sync` 类型。换句话说,当且仅当对该类型的共享引用 `&T` 是 `Send` 的,类型 T 是 `Sync`。例如,i32 是 `Sync`,而 `Cell` 不是。(然而 `Cell` 是 `Send` 的) + +原始类型,例如 i32、bool 以及 str 都是 `Send` 和 `Sync`。 + +这两个 trait 会自动地为你实现 trait,这意味着他们会基于它们的字段为你的类型自动地实现。带有全部 `Send` 和 `Sync` 的结构体字段,本身也是 `Send` 和 `Sync`。 + +选择退出其中任何一种的方式是去增加没有实现该 trait 的字段到你的类型。为此,特殊的 `std::marker::PhantomData` 类型经常派上用场。该类型被编译器视为 T,除非它在运行时实际上不存在。它是零开销类型,不占用任何空间。 + +让我们来看看以下的结构体: + +```rust +use std::marker::PhantomData; + +struct X { + handle: i32, + _not_sync: PhantomData>, +} +``` + +在这个示例中,如果 `handle` 是它唯一的字段,`X` 将是 `Send` 和 `Sync`。然而,我们增加一个零开销带下的 `PhantomData>` 字段,该字段被视为 `Cell<()>`。因为 `Cell<()>` 字段不是 Sync,X 也将不是。但它仍然是 Send,因为所有字段都实现了 Send。 + +原始指针(`*const T` 和 `*mut T`)既不是 Send 也不是 Sync,因为编译器不了解他们表示什么。 + +选择任意 trait 的方式和使用任意其它 trait 相同;使用一个 impl 为你的类型实现 trait: + +```rust +struct X { + p: *mut i32, +} + +unsafe impl Send for X {} +unsafe impl Sync for X {} +``` + +注意,实现这些 trait 需要 `unsafe` 关键字,因为编译器不能为你检查它是否正确。这是你对编译器作出的承诺,你不得不信任它。 + +如果你尝试去移动一些未实现 Send 的值进入另一个线程,编译器将阻止你这样做。有一个小的示例去演示: + +```rust +fn main() { + let a = Rc::new(123); + thread::spawn(move || { // Error! + dbg!(a); + }); +} +``` + +这里,我们尝试去发送 `Rc` 到一个新线程,但是 `Rc` 与 `Arc` 不同,它没有实现 Send。 + +如果我们尝试去编译以上示例,我们将面临看山区像这样的错误: + +```txt +error[E0277]: `Rc` cannot be sent between threads safely + --> src/main.rs:3:5 + | +3 | thread::spawn(move || { + | ^^^^^^^^^^^^^ `Rc` cannot be sent between threads safely + | + = help: within `[closure]`, the trait `Send` is not implemented for `Rc` +note: required because it's used within this closure + --> src/main.rs:3:19 + | +3 | thread::spawn(move || { + | ^^^^^^^ +note: required by a bound in `spawn` +``` + +`thread::spawn` 函数需要它的参数实现 Send,并且只有当其所有的捕获都是 Send,闭包才是 Send。如果我们尝试捕获不是 Send 的实现,就会捕捉我们的错误,保护我们避免未定义行为的影响。 + +## 锁:互斥锁和读写锁 + +在线程之间共享(可变)数据更常规的有用工具是 `mutex`,它是“互斥”(mutual exclusion)的缩写。mutex 的工作是通过暂时阻塞其它试图同时访问某些数据的线程,来确保线程对某些数据进行独占访问。 + +概念上,mutex 仅有两个状态:锁定和释放。当线程锁住一个未释放的 mutex,mutex 被标记为锁定,线程可以立即继续。当线程尝试锁住一个已锁定的 mutex,操作将*阻塞*。当线程等待 mutex 释放时,其会置入睡眠状态。释放仅能在锁定的 mutex 上进行,并且应当由锁定它的同一线程完成。如果其它线程正在等待锁定 mutex,释放将导致唤醒其中一个线程,因此它可以尝试再次锁定 mutex 并且继续它的过程。 + +使用 mutex 保护数据仅是所有线程之间的约定,当它们在 mutex 锁定时,它们才能获取数据。这种方式,没有两个线程可以并发地获取数据和导致数据竞争。 + +### Rust 的互斥锁 + +Rust 的标准库通过 `std::sync::Mutex` 提供这个功能。它对类型 T 进行范型化,该类型 T 是 mutex 所保护的数据类型。通过将 T 作为 mutex 的一部分,该数据仅可以通过 mutex 获取,从而提供一个安全的接口,以保证所有线程都遵守这个约定。 + +为确保锁定的 mutex 仅通过锁定它的线程释放,所以它没有 `unlock()` 方法。然而,它的 `lock()` 方法返回一个称为 `MutexGuard` 的特殊类型。该 guard 表示保证我们已经锁定 mutex。它通过 `DerefMut` trait 行为表现像一个独占引用,使我们能够独占访问互斥体保护的数据。释放 mutex 通过 drop guard 完成。当我们 drop guard 时,我们我们放弃了获取数据的能力,并且 guard 的 `Drop` 实现将释放 mutex。 + +让我们看一个示例,实践中的 mutex: + +```rust +use std::sync::Mutex; + +fn main() { + let n = Mutex::new(0); + thread::scope(|s| { + for _ in 0..10 { + s.spawn(|| { + let mut guard = n.lock().unwrap(); + for _ in 0..100 { + *guard += 1; + } + }); + } + }); + assert_eq!(n.into_inner().unwrap(), 1000); +} +``` + +在这里,我们有一个 `Mutex`,一个保护整数的 mutex,并且我们产生十个线程,每个整数增加线程 100 倍。每个线程将首先锁定 mutex 去获取 MutexGuard,并且然后使用 guard 去获取整数并修改它。当该变量超出作用域后,guard 会立即隐式 drop。 + +线程完成后,我们可以通过 `into_inner()` 安全地从整数中移除保护。`into_inner` 方法获取 mutex 的所有权,这确保了没有其它东西可以引用 mutex,从而使 mutex 变得不再必要。 + +尽管增加是逐步地增加的,但是线程仅能够看见 100 的倍数,因为它只能在 mutex 释放时查看整数。实际上,由于 mutex 的存在,这一百次递增称为了一个单一不可分割的原子操作。 + +为了清晰地看见 mutex 的效果,我们可以让每个线程在释放 mutex 之前等待一秒: + +```rust +use std::time::Duration; + +fn main() { + let n = Mutex::new(0); + thread::scope(|s| { + for _ in 0..10 { + s.spawn(|| { + let mut guard = n.lock().unwrap(); + for _ in 0..100 { + *guard += 1; + } + thread::sleep(Duration::from_secs(1)); // New! + }); + } + }); + assert_eq!(n.into_inner().unwrap(), 1000); +} +``` + +当你现在运行程序,你将看见大约需要花费 10s 才能完成。每个线程仅等待 1s,但是 mutex 确保一次仅有一个线程这么做。 + +如果我们在睡眠 1s 之前 drop guard,并且因此释放 mutex,我们将看到并行发生: + +```rust +fn main() { + let n = Mutex::new(0); + thread::scope(|s| { + for _ in 0..10 { + s.spawn(|| { + let mut guard = n.lock().unwrap(); + for _ in 0..100 { + *guard += 1; + } + drop(guard); // New: drop the guard before sleeping! + thread::sleep(Duration::from_secs(1)); + }); + } + }); + assert_eq!(n.into_inner().unwrap(), 1000); +} +``` -### Rust 的 Mutex +有了这些变化,这个程序大约仅需要 1s,因为 10 个线程现在可以同时执行 1s 的睡眠。这表明了 mutex 锁定时间保持尽可能短的重要性。将 mutex 锁定时间超过必要时间可能会完全抵消并行带来的好处,实际上会强制所有操作按顺序执行。 -### 锁毒化 +### 锁中毒(posion) -### Reader-Writer Lock +上述示例中 `unwarp()` 调用和*锁中毒*有关。 -## 等待: Parking 和条件变量 +当线程在持有锁时 panic,Rust 中的 mutex 将被标记为*中毒*。当这种情况发生时,Mutex 将不再被锁定,但调用它的 `lock` 方法将导致 `Err`,以表明它已经中毒。 -### Thread Parking +这是一个防止由 mutex 保护的数据处于不一致状态的机制。在我们上面的示例中,如果一个线程将在整数增加不到 100 之后崩溃,mutex 将释放并且整数将处于一个意外的状态,它不再是 100 的倍数,这可能打破其它线程的设定。在这种情况下,自动标记 mutex 中毒,强制用户处理这种可能。 + +在中毒的 mutex 上调用 `lock()` 仍然可能锁定 mutex。由 `lock()` 返回的 Err 包含 `MutexGuard`,允许我们在必要时纠正不一致的状态。 + +虽然锁中毒是一种强大的机制,在实践中,从潜在的不一致状态恢复并不常见。如果锁中毒,大多数代码要么忽略了中毒或者使用 `unwrap()` 去 panic,这有效地将 panic 传递给 mutex 的所有用户。 + +
+

MutexGuard 的生命周期

+ 尽管隐式 drop guard 释放 mutex 很方便,但是它有时会导致微妙的意外。如果我们使用 let 语句授任 guard 一个名字(正如我们上面的示例),看它什么时候会被丢弃相对简单,因为局部变量定义在它们作用域范围的末尾。然而,正如上述示例所示,不明确地 drop guard 可能导致 mutex 锁定的时间超过所需时间。 + + 在不给它指定名称的情况下使用 guard 也是可能的,并且有时非常方便。因为 MutexGuard 保护数据的行为像独占引用,我们可以直接使用它,而无需首先为他授任一个名称。例如,你有一个 Mutex>,你可以在单个语句中锁定 mutex,将项推入 Vec,并且再次锁定 mutex: + +
list.lock().unwrap().push(1);
+ + 任何更大表达式产生的临时值,例如通过 lock() 返回的 guard,将在语句结束后被 drop。尽管这似乎显而易见,但它导致了一个常见的问题,这通常涉及 matchif let 以及 while let 语句。以下是遇到该陷阱的示例: + +
if let Some(item) = list.lock().unwrap().pop() {
+    process_item(item);
+}
+ + 如果我们的旨意就是锁定 list、弹出 item、释放 list 然后在释放 list 后处理 item,我们在这里犯了一个微妙而严重的错误。临时的 guard 直到完整的 if let 语句结束后才能被 drop,这意味着我们在处理 item 时不必要地持有锁。 + + 或许,意外地是,对于类似地 if 语句,这并不会发生,例如以下示例: + +
if list.lock().unwrap().pop() == Some(1) {
+    do_something();
+}
+ + 在这里,临时的 guard 在 if 语句的主题执行之前就执行 drop 了。该原因是,通常 if 语句的条件总是一个布尔值,它并不能借用任何东西。没有理由将临时的生命周期从条件开始延长到语句的结尾。对于 if let 语句,情况可能并非如此。例如,如果我们使用 `front()`,而不是 `pop()`,项将会从 list 中借用,因此有必要保持 guard 存在。因为借用检查实际上只是一种检查,它并不会影响何时以及什么顺序 drop,所以即使我们使用了 pop(),情况仍然是相同的,尽管那并不是必须的。 + + 我们可以通过将弹出操作移动到单独的 let 语句来避免这种情况。然后在该语句的末尾放下 guard,在 if let 之前: + +
let item = list.lock().unwrap().pop();
+if let Some(item) = item {
+    process_item(item);
+}
+
+ +### 读写锁 + +互斥锁仅涉及独占访问。MutexGuard 将提供受保护数据的一个独占引用(`&mut T`),即使我们仅想要查看数据,并且共享引用(`&T`)就足够了。 + +读写锁是一个略微更复杂的 mutex 版本,它能够区分独占访问和共享访问的区别,并且可以提供两种访问方式。它有三种状态:释放、由单个 *writer* 锁定(用于独占访问)以及由任意数量的 reader 锁定(用于共享访问)。它通常用于通常由多个线程读取的数据,但只是偶尔一次。 + +Rust 标准库通过 `std::sync::RwLock` 类型提供该锁。它与标准库的 Mutex 工作类似,只是它的接口大多是分成两个部分。然而,单个 `lock()` 方法,它有 `read()` 和 `write()` 方法,用于为 reader 或 writer 进行锁定。它还附带了两种 guard 类型,一种用于 reader,一种用于 writer:RwLockReadGuard 和 RwLockWriteGuard。前者只实现了 Deref,其行为像受保护数据共享引用,后者还实现了 DerefMut,其行为像独占引用。 + +它实际上是 `RefCell` 的多线程版本,动态地跟踪引用的数量以确保借用规则得到维护。 + +`Mutex` 和 `RwLock` 都需要 T 是 Send,因为它们可能发送 T 到另一个线程。除此之外,`RwLock` 也需要 T 实现 Sync,因为它允许多个线程对受保护的数据持有共享引用(`&T`)。(严格地说,你可以创建一个并没有实现这些需求 T 的锁定,但是你不能在线程之间共享它,因为锁本身并没有实现 Sync)。 + +Rust 标准库仅提供一种通用的 `RwLock` 类型,但它的实现依赖于操作系统。读写锁之间有很多细微差别。当有 writer 等待时,即使当锁已经读取锁定时,很多实现将阻塞新的 reader。这样做是为了防止 *writer 挨饿*,在这种情况下,很多 reader 将共同持有锁而导致锁从不释放,从而不允许任何 writer 更新数据。 + +
+

在其他语言中的互斥锁

+ Rust 标准的 Mutex 和 RwLock 类型与你在其它语言(例如 C、C++)发现的看起来有一点不同。 + + 最大的区别是,Rust 的 Mutex 数据包含它正在保护的数据。例如,在 C++ 中,std::mutex 并不包含着它保护的数据,甚至不知道它在保护什么。这意味着,用户有指责记住哪些数据由 mutex 保护,并且确保每次访问“受保护”的数据都锁定正确的 mutex。注意,当读取其它语言涉及到 mutex 的代码,或者与不熟悉 Rust 程序员沟通时,非常有用。Rust 程序员可能讨论关于“数据在 mutex 之中”,或者说“mutex 中包裹数据”这类话,这可能让只熟悉其它语言 mutex 的程序员感到困惑。 + + 如果你真的需要一个不包含任何内容的独立 mutex,例如,保护一些外部硬件,你可以使用 Mutex<()>。但即使是这种情况,你最好定义一个(可能 0 大小开销)的类型来与该硬件对接,并将其包裹在 Mutex 之中。这样,在与硬件交互之前,你仍然可以强制锁定 mutex。 +
+ +## 等待: 阻塞(Park)和条件变量 + +当数据由多个线程更改时,在许多情况下,它们需要等待一些事件,以便管有数据的某些条件变为真。例如,如果我们有一个保护 Vec 的 mutex,我们可能想要等待直到它包含任何东西。 + +尽管 mutex 允许线程等待直到它释放,但它不提供等待任何其它条件的功能。如果我们只拥有一个 mutex,我们不得不持有锁定的 mutex,以反复检查 Vec 中是否有任意东西。 + +### 线程阻塞 + +一种方式是去等待来自另一个线程的通知,其被称为*线程阻塞*。一个线程可以阻塞它自己,将它置入睡眠状态,阻止它消耗任意 CPU 周期。然后,另一个线程可以释放阻塞的线程,将其从睡眠中唤醒。 + +线程阻塞可以通过 `std::thread::park()` 函数获得。对于释放,你可以在 `Thread` 对象中调用 `unpark()` 函数表示你想要释放该线程。这样的对象可以通过 spawn 返回的 join 句柄获得,或者也可以通过 `std::thread::current()` 从线程本身中获得。 + +让我们深入研究在线程之间使用 mutex 共享队列的示例。在以下示例中,一个新产生的线程将消费来自队列的项,尽管主线程将每秒插入新的项到队列。线程阻塞被用于在队列为空时使消费线程等待。 + +```rust +use std::collections::VecDeque; + +fn main() { + let queue = Mutex::new(VecDeque::new()); + + thread::scope(|s| { + // Consuming thread + let t = s.spawn(|| loop { + let item = queue.lock().unwrap().pop_front(); + if let Some(item) = item { + dbg!(item); + } else { + thread::park(); + } + }); + + // Producing thread + for i in 0.. { + queue.lock().unwrap().push_back(i); + t.thread().unpark(); + thread::sleep(Duration::from_secs(1)); + } + }); +} +``` + +消费线程进行一个无穷尽的循环,它将项弹出队列,使用 `dbg` 宏展示它们。当队列为空的时候,它停止并且使用 `park()` 函数进行睡眠。如果它得到释放,`park()` 调用将返回,循环继续,再次从队列中弹出项,直到它是空的。等等。 + +生产线程将其推入队列,每秒产生一个新的数字。每次增加一个项时,它都会在 Thread 对象上使用 `unpark()` 方法,该方法引用消费线程来释放它。这样,消费线程就会被唤醒处理新的元素。 + +这样,我们要作出的一个重要的观测是,如果我们移除**阻塞**,程序将在理论上正确,尽管效率低下。这是重要的,因为 `park()` 不能保证它将由于匹配 `unpark()` 而返回。尽管有些罕见,但它很可能会有*虚假唤醒*。我们的示例处理得很好,因为消费线程将锁定去咧,可以看到它是空的,然后直接释放它并再次阻塞。 + +线程阻塞的一个重要属性是,在线程自己进入阻塞之前,对 `unpark()` 的调用不会丢失。对 unpark 的请求仍然被记录下来,并且下次线程尝试挂起自己的时候,它会清除该请求并且直接继续执行,实际上并不会进入睡眠状态。为了理解这对于正确操作的关键性,让我们来看一下程序可能执行步骤的顺序: + +1. 消费线程(让我们称之为 C)锁定队列。 +2. C 尝试去从队列中弹出项,但是它是空的,导致 None。 +3. C 释放队列。 +4. 生产线程(我们将称为 P)锁定队列。 +5. P 推入一个新的项进入队列。 +6. P 再次释放队列。 +7. P 调用 `unpark()` 去通知 C,有一些新的项。 +8. C 调用 `park()` 去睡眠,以等待更多的项。 + +虽然在步骤 3 释放队列和在步骤 8 阻塞之间很可能仅有一个很短的时间,但第 4 步和第 7 步可能在线程阻塞自己之前发生。如果 `unpark()` 在线程没有挂起时不执行任何操作,那么通知将会丢失。即使队列中有项,消费线程仍然在等待。由于 unpark() 请求被保存,以供将来调用 park() 时使用,我们不必担心这个问题。 + +然而,unpark 请求并不会堆起来。先调用两次 `unpark()`,然后再调用两次 `park()`,线程仍然会进入睡眠状态。第一次 `park()` 清除请求并直接返回,但第二次调用通常让它进入睡眠。 + +这意味着,在我们上面的示例中,重要的是我们看见队列为空的时候,我们仅会阻塞线程,而不是在处理每个项之后将其阻塞。然而由于巨长的(1s)睡眠,这种情况在本示例中几乎不可能发生,但多个 `unpark()` 调用仅能唤醒单个 `park()` 调用。 + +不幸的是,这确实意味着,如果在 `park()` 返回后,立即调用 `unpark()`,但是在队列得到锁定并清空之前,`unpark()` 调用是不必要的,单仍然会导致下一个 `park()` 调用立即返回。这导致(空的)队列多次被锁定并释放。虽然这不会影响程序的正确性,但这确实会影响它的效率和性能。 + +这种机制在简单的情况下是好的,比如我们的示例,但是当东西变得复杂,情况可能会很糟糕。例如,如果我们有多个消费线程从相同的队列获取项时,生产线程将不会知道有哪些消费者实际上在等待以及应该被唤醒。生产者将必须知道消费者正在等待的时间以及正在等待的条件。 ### 条件变量 +条件变量是一个更通用的选贤,用于等待受 mutex 保护的数据发生变化。它有两种基本操作:等待和通知。线程可以在条件变量上等待,然后在另一个线程通知相同条件变量时被唤醒。多个线程可以在通向的条件变量上等待,通知可以发送给一个等待线程或者所有等待线程。 + +这意味着我们可以为我们感兴趣的事件或条件创建一个条件变量,例如,队列是非空的,并且在该条件下等待。任意导致事件或条件发生的线程都会通知条件变量,无需知道那个或有多个线程对该通知感兴趣。 + +为了避免在释放 mutex 和等待条件变量的短暂时间失去通知的问题,条件变量提供了一种*原子地*释放 mutex 和开始等待的方式。这意味着根本没有通知丢失的时刻。 + +Rust 标准库提供了 `std::sync::Condvar` 作为条件变量。它的等待方法采用 `MutexGuard`,以保证我们已经锁定 mutex。它首先释放 mutex 并进入睡眠。稍后,当唤醒时,它重新锁定 mutex 并且返回一个新的 MutexGuard(这证明了 mutex 再次被锁定)。 + +它有两个通知方法:`notify_one` 仅唤醒一个线程(如果有),和 `notify_all` 去唤醒所有线程。 + +让我们改用 Condvar 修改我们用于线程停放的示例: + +```rust +use std::sync::Condvar; + +let queue = Mutex::new(VecDeque::new()); +let not_empty = Condvar::new(); + +thread::scope(|s| { + s.spawn(|| { + loop { + let mut q = queue.lock().unwrap(); + let item = loop { + if let Some(item) = q.pop_front() { + break item; + } else { + q = not_empty.wait(q).unwrap(); + } + }; + drop(q); + dbg!(item); + } + }); + + for i in 0.. { + queue.lock().unwrap().push_back(i); + not_empty.notify_one(); + thread::sleep(Duration::from_secs(1)); + } +}); +``` + +* 我们必须改变一些事情: + * 我们现在不仅有一个包含队列的 Mutex,同时有一个 Condvar 去通信“不为空”的条件。 + * 我们不再需要知道要唤醒哪个线程,因此我们不再存储 spawn 的返回值。而是,我们通过使用 `notify_one` 方法的条件变量通知消费者。 + * 释放、等待以及重新锁定都是通过 `wait` 方法完成的。我们不得不稍微重组控制流,以便出阿迪 guard 到 wait 方法,同时在处理项之前仍然 drop 它。 + +现在,我们可以根据自己的需求生成尽可能多的消费线程,甚至稍后生成更多线程,而无需更改任何东西。条件变量会负责将通知传递给任何感兴趣的线程。 + +如果我们有个更加复杂的系统,其线程对不同条件感兴趣,我们可以为每个条件定义一个 `Condvar`。例如,我们能定义一个来指示队列是非空的并且另一个指示它是空的。然后,每个线程将等待与它们正在做的事情相关的条件。 + +通常,Condvar 仅能与单个 Mutex 一起使用。如果两个线程尝试使用两个不同的 mutex 去并发地等待条件变量,它可能导致 panic。 + +Condvar 的缺点是,它仅能与 Mutex 一起工作,对于大多数用例是没问题的,因为已经在保护数据时使用了 mutex。 + +`thread::park()` 和 `Condvar::wait()` 也都有一个有时间限制的变体:`thread::park_timeout()` 和 `Condvar::wait_timeout()`。它们接受一个额外的参数 Duration,表示在多长时间后放弃等待通知并无条件地唤醒。 + ## 总结 + +* 多线程可以并发地运行在相同程序并且可以在任意时间生成。 +* 当主线程结束,主程序结束。 +* 数据竞争是未定义行为,它会由 Rust 的类型系统完全地组织(在安全的代码中)。 +* 常规的线程可以像程序运行一样长时间,并且因此只能借用 `'static` 数据。例如静态和泄漏分配。 +* 引用计数(Arc)可以用于共享所有权,以确保只要有一个线程使用它,数据就会存在。 +* 作用域线程用于限制线程的生命周期,以允许其借用非 `'static` 数据,例如作用域变量。 +* `&T` 是*共享引用*。`&mut T` 是*独占引用*。常规类型不允许通过共享引用可变。 +* 一些类型有着内部可变性,这要归功于 `UnsafeCell`,它允许通过共享引用改变。 +* Cell 和 RefCell 是单线程内部可变性的标准类型。Atomic、Mutex 以及 RwLock 是它们多线程等价物。 +* Cell 和原子类型仅允许作为整体替换值,而 RefCell、Mutex 和 RwLock 允许你通过动态执行访问规则直接替换值。 +* 线程阻塞可以是等待某种条件的便捷方式。 +* 当条件是关于由 Mutex 保护的数据时,使用 `Condvar` 时更方便的,并且比线程阻塞更有效。 + +[^1]: +[^2]: +[^3]: +[^4]: +[^5]: +[^6]: diff --git a/9_Building_Our_Own_Locks.md b/9_Building_Our_Own_Locks.md index 2e9c682..db4cb1e 100644 --- a/9_Building_Our_Own_Locks.md +++ b/9_Building_Our_Own_Locks.md @@ -2,13 +2,13 @@ 在该章节,我们将建造属于我们自己的互斥锁(`mutex`)[^3]、条件变量(`condition variable`)[^4]以及读写锁(`reader-writer lock`)[^5]。对于它们中的任何一个,我们都会从一个非常基础的版本开始,然后扩展它以使其更高效。 -因为我们并不会使用来自标准库中的锁类型(因为这将是作弊行为),因此我们将不得不使用来自[第八章](./8_Operating_System_Primitives.md)的工具,才能够在不忙碌循环(`busy-looping`[^1])的情况下使线程等待。然而,正如我们在该章节看到的,操作系统提供的可用工具因平台而异,因此很难去构建跨平台工作的东西。 +由于我们并不会使用来自标准库中的锁类型(因为这将是作弊行为),因此我们将不得不使用来自[第八章](./8_Operating_System_Primitives.md)的工具,才能够在不忙碌循环(`busy-looping`[^1])的情况下使线程等待。然而,正如我们在该章节看到的,操作系统提供的可用工具因平台而异,因此很难去构建跨平台工作的东西。 幸运地是,更多现代化操作系统都支持类似 `futex` 的功能,或者至少支持唤醒(`wake`)和等待(`wait`)操作。正如我们在[第八章](./8_Operating_System_Primitives.md)看到的,Linux 自从 2003 年就一直支持 futex 系统调用,Winodws 自从 2012 年就支持 `WaitOnAddress` 系列功能,FreeBSD 自从 2016 年就将 `_umtx_op` 作为系统调用的一部分,等等。 最让人意外的是 macOS。尽管它的内核支持这些操作,但是它并没有暴露任意稳定、公共的 C 函数给我们使用。然而,macOS 附带了一个最新版本的 `libc++`(这是一个 C++ 标准库的实现)。该标准库包含对 C++20 的支持,该版本内置了非常基础对原子等待和唤醒操作(像 `std::atomic::wait()`)。尽管由于各种原因,Rust 利用这些还非常的棘手,然而,这当然是可能的,这也可以让我们在 macOS 上访问基本的像 futex 的等待和唤醒功能。 -我们将不再深入研究哪些复杂的细节,而是选择利用 `crates.io` 的 `atomic-wait` crate,为我们的「锁」原语提供基础的构建模块。该 crate 提供了三个函数:`wait()`、`wake_one()` 以及 `wake_all()`。它使用我们上面讨论的特定于平台规范的实现,为所有主要的平台实现了这些功能。这意味着我们只要坚持使用这三个函数,我们不再需要考虑任何平台的特定细节。 +我们将不再深入研究那些复杂的细节,而是选择利用 `crates.io` 的 `atomic-wait` crate,为我们的「锁」原语提供基础的构建模块。该 crate 提供了三个函数:`wait()`、`wake_one()` 以及 `wake_all()`。它使用我们上面讨论的特定于平台规范的实现,为所有主要的平台实现了这些功能。这意味着我们只要坚持使用这三个函数,我们不再需要考虑任何平台的特定细节。 这些函数的行为就像我们在 Linux [第八章中的“Futex”](./8_Operating_System_Primitives.md#Futex)中实现的同名函数一样,不过让我们快速回顾一下如何工作的。 @@ -30,7 +30,7 @@ > 在[第八章中的“Futex”](./8_Operating_System_Primitives.md#Futex),我们讨论了一个最小示例,以展示这些函数在实践中是如何使用的。如果你已经忘记,请务必在继续之前查看该示例。 -为了使用 atomic-wait crate,在你的 `Cargo.toml` 中增加 `atomic-wait="1"` 到 `[dependencies]`;或者运行 `cargo add atomic-wait@1`,这样也同样位你做到这点。这三个函数在 crate 的根中定义,你可以使用 `atomic_wait::{wait, wake_one, wake_all};` 导入它们。 +为了使用 atomic-wait crate,在你的 `Cargo.toml` 中增加 `atomic-wait="1"` 到 `[dependencies]`;或者运行 `cargo add atomic-wait@1`,这样也同样为你做到这点。这三个函数在 crate 的根中定义,你可以使用 `atomic_wait::{wait, wake_one, wake_all};` 导入它们。 > 当你阅读到这篇文章时,该 crate 可能有后续的可用版本,但该章节进使用主版本为 1 的构建。后续的版本可能有不兼容的接口。 @@ -51,7 +51,7 @@ pub struct Mutex { } ``` -就像是自旋锁一样,我们也需要承诺 `Mutex` 也可以在线程之间共享,即使它包含一个可怕的 `UnsafeCell`: +就像是自旋锁一样,我们也需要保证 `Mutex` 可以在线程之间共享,即使它包含一个可怕的 `UnsafeCell`: 我们将增加一个 `MutexGuard` 类型,该类型实现了 `Deref` trait,以提供一个完全安全的锁接口,就像我们在[第四章:使用锁守卫的安全接口](./4_Building_Our_Own_Spin_Lock.md#使用锁守卫的安全接口): @@ -414,7 +414,7 @@ impl Condvar { ![ ](./picture/raal_0902.png) *图 9-2。一个线程使用 `Condvar::wait()` 被另一个使用 `Condvar::notify_one()` 的线程唤醒的操作和 happens-before 的关系。* -我们应该也考虑如果 conter 溢出会发生什么。 +我们应该也考虑如果 counter 溢出会发生什么。 只要每次通知之后计数器是不同的,它的真实值就无关紧要。不幸的是,在超过 40 亿个通知之后,计数器将溢出,并以 0 重新启动,回到之前使用过的值。从技术上讲,我们的 `Condvar::wait()` 实现可能在不应该的时候进入睡眠状态:如果它正好错过了 4,292,967,296 条通知(或者任意它的倍数),它会溢出计数器,直到它之前拥有的值。 diff --git a/README.md b/README.md index 6048a7a..4703946 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,10 @@ * [线程安全:Send 和 Sync](./1_Basic_of_Rust_Concurrency.md#线程安全send-和-sync) * [锁:Mutex 和 RwLock](./1_Basic_of_Rust_Concurrency.md#锁mutex-和-rwlock) * [Rust 的 Mutex](./1_Basic_of_Rust_Concurrency.md#rust-的-mutex) - * [锁毒化](./1_Basic_of_Rust_Concurrency.md#锁毒化) + * [锁中毒](./1_Basic_of_Rust_Concurrency.md#锁中毒posion) * [Reader-Writer Lock](./1_Basic_of_Rust_Concurrency.md#reader-writer-lock) -* [等待:Parking 和条件变量](./1_Basic_of_Rust_Concurrency.md#等待-parking-和条件变量) - * [Thread Parking](./1_Basic_of_Rust_Concurrency.md#thread-parking) +* [等待:阻塞和条件变量](./1_Basic_of_Rust_Concurrency.md#等待-阻塞park和条件变量) + * [线程阻塞](./1_Basic_of_Rust_Concurrency.md#线程阻塞) * [条件变量](./1_Basic_of_Rust_Concurrency.md#条件变量) * [总结](./1_Basic_of_Rust_Concurrency.md#总结) @@ -137,10 +137,10 @@ * [RCU](./10_Ideas_and_Inspiration.md#rcu) * [无锁链表](./10_Ideas_and_Inspiration.md#无锁链表) * [基于队列的锁](./10_Ideas_and_Inspiration.md#基于队列的锁) -* [基于 Parking 的锁](./10_Ideas_and_Inspiration.md#基于-parking-的锁) +* [基于阻塞的锁](./10_Ideas_and_Inspiration.md#基于阻塞的锁) * [顺序锁](./10_Ideas_and_Inspiration.md#顺序锁spinlock) * [教学材料](./10_Ideas_and_Inspiration.md#教学材料) -注明:本文译自 ,若其它平台引用此翻译,也请注明出处。 +注明:本文译自 ,若其它平台引用此翻译,也请注明出处。 翻译进行中...