diff --git a/1_Basic_of_Rust_Concurrency.md b/1_Basic_of_Rust_Concurrency.md index abe7f5a..78887ad 100644 --- a/1_Basic_of_Rust_Concurrency.md +++ b/1_Basic_of_Rust_Concurrency.md @@ -1,6 +1,6 @@ # 第一章:Rust 并发基础 -早在多核处理器司空见惯之前,操作系统就允许一台计算机运行多个程序。这是通过在进程之间快速切换来完成的,允许每个进程逐个地重重地取得一点进展。现在,几乎所有的电脑,甚至手机和手表都有着多核处理器,可以真正并行执行多个程序。 +早在多核处理器司空见惯之前,操作系统就允许一台计算机运行多个程序。这是通过在进程之间快速切换来完成的,允许每个进程逐个地逐次取得一点进展。现在,几乎所有的电脑,甚至手机和手表都有着多核处理器,可以真正并行执行多个程序。 操作系统尽可能的将进程之间隔离,允许程序完全意识不到其他线程在做什么的情况下做自己的事情。例如,在不先询问操作系统内核的情况下,一个进程通常不能获取其他进程的内存,或者以任意方式与之通信。 @@ -14,7 +14,7 @@ 每个程序都从一个线程开始:主(main)线程。该线程将执行你的 main 函数,并且如果需要,它可以用于产生更多线程。 -在 Rust 中,新线程使用来自标准库的 `std::thread::spawn` 函数产生。它接受一个参数:新线程执行的函数。一旦该函数停止,将立刻返回。 +在 Rust 中,新线程使用来自标准库的 `std::thread::spawn` 函数产生。它接受一个参数:新线程执行的函数。一旦该函数返回,线程就会停止。 让我们看一个示例: @@ -40,7 +40,7 @@ fn f() {

Thread ID

- Rust 标准库位每一个线程分配一个唯一的标识符。此标识符可以通过 Thread::id() 访问并且拥有 ThreadId 类型。除了复制 ThreadId 以及检查它们相等外,你也做不了什么。不能保证这些 ID 将会连续分配,只是每个线程都会有所不同。 + Rust 标准库位每一个线程分配一个唯一的标识符。此标识符可以通过 Thread::id() 访问并且拥有 ThreadId 类型。除了复制 ThreadId 以及检查它们是否相等外,你也做不了什么。不能保证这些 ID 将会连续分配,只是每个线程都会有所不同。
如果你运行我们上面的示例几次,你可能注意到输出在运行之间有所不同。一次特定的运行在机器上的输出: @@ -51,15 +51,15 @@ Hello from another thread! This is my thread id: ``` -惊讶地是,部分输出似乎失去了。 +惊讶的是,部分输出似乎丢失了。 -这里发生的情况是:新的线程结束执行它们的函数之前,主线程结束执行了主函数。 +这里发生的情况是:新的线程完成其函数的执行之前,主线程完成了主函数的执行。 -从主函数返回将退出整个程序,即使所有线程仍然在运行。 +从主函数返回将退出整个程序,即使其它线程仍然在运行。 在这个特定的示例中,在程序被主线程关闭之前,其中一个新的线程有足够的消息到达第二条消息的一半。 -如果我们想要在主函数返回之前,确保线程结束,我们可以通过 `join` 它们来等待。未来这样做,我们在 `spawn` 函数返回后使用 `JoinHandle`: +如果我们想要线程在主函数返回之前完成执行,我们可以通过 `join` 它们来等待。未来这样做,我们使用 `spawn` 函数返回的 `JoinHandle`: ```rust fn main() { @@ -75,7 +75,7 @@ fn main() { `.join()` 方法等待直到线程结束执行并且返回 `std::thread::Result`。如果线程由于 panic 不能成功地完成它的函数,这将包含 panic 消息。我们试图去处理这种情况,或者为 join panic 的线程调用 `.unwrap()` 去 panic。 -运行我们程序的这个版本,将不再导致截断的输出: +运行我们程序的这个版本,将不再导致输出被截断: ```txt Hello from the main thread. @@ -97,7 +97,7 @@ This is my thread id: ThreadId(3)

输出锁定

- println 宏使用 std::io::Stdout::lock() 去确保输出没有被中断。println!() 将等待直到任意并发地运行完成后,在写入输出。如果不是这样,我们可以得到更多的交叉输出: + println 宏使用 std::io::Stdout::lock() 去确保输出没有被中断。println!() 表达式将等待直到任意并发的表达式运行完成后,再写入输出。如果不是这样,我们可能得到更多的交错输出:
   Hello fromHello from another thread!
@@ -107,7 +107,7 @@ This is my thread id: ThreadId(3)
   id: ThreadId(3)
-与其将函数的名称传递给 `std::thread::spawn`,不如像我们上面的示例那样,传递一个*闭包*。这允许我们捕获值移动到新的线程: +与其将函数的名称传递给 `std::thread::spawn`(像我们上面的示例那样),不如传递一个*闭包*。这允许我们捕获值并移动到新的线程: ```rust let numbers = vec![1, 2, 3]; @@ -119,11 +119,11 @@ thread::spawn(move || { }).join().unwrap(); ``` -在这里,numbers 的所有权被转移到新产生的线程,因为我们使用了 `move` 闭包。如果我们没有使用 `move` 关键字,闭包将会通过引用补货 numbers。这将导致一个编译器错误,因为新的线程超出变量的生命周期。 +在这里,numbers 的所有权被转移到新产生的线程,因为我们使用了 `move` 闭包。如果我们没有使用 `move` 关键字,闭包将会通过引用捕获 numbers。这将导致一个编译器错误,因为新的线程比变量的生命周期更长。 由于线程可能运行直到程序执行结束,因此产生的线程在它的参数类型上有 `'static` 生命周期绑定。换句话说,它只接受永久保留的函数。闭包通过引用捕获局部变量不能够永久保留,因为当局部变量不存在时,引用将变得无效。 -从线程中取回一个值,是从闭包中返回完成的。该返回值通过 `join` 方法返回的 `Result` 中获取: +从线程中取回一个值,是从闭包中返回值来完成的。该返回值可以通过 `join` 方法返回的 `Result` 中获取: ```rust let numbers = Vec::from_iter(0..=1000); @@ -149,14 +149,14 @@ println!("average: {average}");

std::thread::Builder 允许你在产生线程之前为新线程设置一些设置。你可以使用它为新线程配置栈大小并给新线程一个名字。线程的名字是可以通过 std::thread::current().name() 获得,这将在 panic 消息中可用,并在监控和大多数雕饰工具中可见。

-

此外,Builder 的产生函数返回一个 std::io::Result,允许你处理新线程失败的情况。如果操作系统内存不足,或者资源限制已经应用于你对程序,这是可能发生的。如果 std::thread::spawn 函数不能去产生一个新线程,它只会 panic。

+

此外,Builder 的产生函数返回一个 std::io::Result,允许你处理产生新线程失败的情况。如果操作系统内存不足,或者资源限制已经应用于你的程序,这是可能发生的。如果 std::thread::spawn 函数不能去产生一个新线程,它就会 panic。

-## 线程作用域 +## 作用域内的线程 如果我们确信生成的线程不会比某个范围存活更久,那么线程可以安全地借用哪些不会一直存在的东西,例如局部变量,只要它们比该范围活得更久。 -Rust 标准库提供了 `std::thread::scope` 去产生此类*线程作用域*。它允许我们产生不超过我们传递给该函数闭包的范围的线程,这使它可能安全地借用局部变量。 +Rust 标准库提供了 `std::thread::scope` 去产生此类*作用域内的线程*。它允许我们产生不超过我们传递给该函数闭包的范围的线程,这使它可能安全地借用局部变量。 它的工作原理最好使用一个示例来展示: @@ -196,7 +196,7 @@ thread::scope(|s| { }); ``` -确切的错误信息以待遇 Rust 编译器版本,因为它经常被改进以提升更好的判断,但是试图去编译以上代码将导致一下问题: +确切的错误信息取决于 Rust 编译器版本,因为它会在不断改进中以产生更好的诊断,但是试图去编译以上代码将导致以下问题: ```txt error[E0499]: cannot borrow `numbers` as mutable more than once at a time @@ -215,7 +215,7 @@ error[E0499]: cannot borrow `numbers` as mutable more than once at a time

泄漏启示录

-

在 Rust 1.0 之前,标准库有一个函数叫做 std::thread::scoped,它将直接产生一个线程,就像 std::thread::spawn。它允许无 'static 的捕获,因为它返回的不是 JoinGuard,而是当被 drop 时 join 到线程的 JoinGuard。任意的借用数据仅需要比这个 JoinGuard 活得更久。只要 JoinGuard 在某个时候被 drop,这似乎是安全的。

+

在 Rust 1.0 之前,标准库有一个函数叫做 std::thread::scoped,它将直接产生一个线程,就像 std::thread::spawn。它允许无 'static 的捕获,因为它返回的不是 JoinHandle,而是当被 drop 时 join 到线程的 JoinGuard。任意的借用数据仅需要比这个 JoinGuard 活得更久。只要 JoinGuard 在某个时候被 drop,这似乎是安全的。

就在 Rust 1.0 发布之前,人们慢慢发现它似乎不能保证某些东西被 drop。有很多种方式没有 drop 它,例如创建一个引用计数节点的循环,可以忘记某些东西或者*泄漏*它。

@@ -226,11 +226,11 @@ error[E0499]: cannot borrow `numbers` as mutable more than once at a time ## 共享所有权以及引用计数 -目前,我们已经使用了 `move` 闭包([“Rust 中的线程”](#rust-中的线程))将值的所有权转移到线程并从生命周期较长的父线程借用数据([“线程作用域”](#线程作用域))。当两个线程之间共享数据,它们之间的任何一个线程都不能保证比另一个线程的生命周期长,那么它们都不能称为该数据的所有者。它们之间共享的任何数据都需要与最长生命周期的线程一样长。 +目前,我们已经使用了 `move` 闭包([“Rust 中的线程”](#rust-中的线程))将值的所有权转移到线程并从生命周期较长的父线程借用数据([作用域内的线程](#作用域内的线程))。当两个线程之间共享数据,它们之间的任何一个线程都不能保证比另一个线程的生命周期长,那么它们都不能称为该数据的所有者。它们之间共享的任何数据都需要与最长生命周期的线程一样长。 ### Static -有几种方式去创建不属于单线程的东西。最简单的方式是**静态**值,它由整个程序“拥有”,而不是单个线程。在以下示例中,这两个线程都可以获取 X,但是它们并不能拥有它: +有几种方式去创建不属于单线程的东西。最简单的方式是**静态**值,它由整个程序“拥有”,而不是单个线程。在以下示例中,这两个线程都可以获取 X,但是它们并不拥有它: ```rust static X: [i32; 3] = [1, 2, 3]; @@ -262,7 +262,7 @@ thread::spawn(move || dbg!(x)); ### 引用计数 -为了确保共享数据能够 drop 和取消分配,我们不能完全放弃它的所有权。相反,我们可以*分享所有权*。通过跟踪所有者的数量,我们确保仅当没有所有者时,才会丢弃该值。 +为了确保共享数据能够 drop 和释放内存,我们不能完全放弃它的所有权。相反,我们可以*分享所有权*。通过跟踪所有者的数量,我们确保仅当没有所有者时,才会丢弃该值。 Rust 标准库通过 `std::rc::Rc` 类型提供了该功能,它是“引用计数”(reference counted)的缩写。它与 `Box` 非常类似,唯一的区别是克隆它将不会分配任何新内存,而是增加存储在包含值旁边的计数器。原始的 `Rc` 和克隆的 `Rc` 将引用相同的内存分配;它们*共享所有权*。 @@ -376,7 +376,7 @@ fn f(a: &i32, b: &mut i32) { 这里,我们得到一个整数的不可变引用,并在增加 b 所引用的整数进行递增操作之前和之后存储整数的值。编译器可以自由地假设关于借用和数据竞争的基本规则得到了遵守,这意味着 b 不可能引用与 a 相同的整数。实际上,在对 a 进行借用时,整个程序中没有任何地方对 a 借用的整数进行可变借用。因此,编译器可以轻松地推断 `*a` 不会发生变化,并且 `if` 语句将永远不是 true,并且可以作为优化完全地删除 x 调用。 -除了使用一个不安全的块(`unsafe`)去禁止一些编译器安全地检查,否则不可能去写 Rust 程序打断编译器的假设。 +除了使用不安全的块(`unsafe`)禁止一些编译器的安全检查之外,不可能写出编译器出乎意料的 Rust 程序。

未定义行为

@@ -384,20 +384,20 @@ fn f(a: &i32, b: &mut i32) {

在 Rust 中,仅当使用 unsafe 代码块才能打破这些规则。“unsafe”并不意味着代码是错误的或者错位安全使用,而是编译器并没有为你验证代码是安全的。如果代码却是违法了这些规则,则称为不健全的(unsound)。

-

允许编译器在不检查的情况下假设这些规则从未破坏。当破坏是,这将导致叫做为定义行为的问题,我们需要不惜一切代价去避免。如果我们允许编译器作出与实际不符的假设,那么它可能很容易导致关于代码不同部分更错误的结论,影响你整个程序。

+

允许编译器在不检查的情况下假设这些规则从未破坏。当破坏是,这将导致叫做未定义行为的问题,我们需要不惜一切代价去避免。如果我们允许编译器作出与实际不符的假设,那么它可能很容易导致关于代码不同部分更错误的结论,影响你整个程序。

作为一个具体的例子,让我们看看在切片上使用 get_unchecked 方法的小片段:

let a = [123, 456, 789];
 let b = unsafe { a.get_unchecked(index) };
-

get_unchecked 方法给我们一个给定索引的切片元素,就像 a[index],但是允许变异器假设索引总是在边界,没有任何检查。

+

get_unchecked 方法给我们一个给定索引的切片元素,就像 a[index],但是允许编译器假设索引总是在边界,没有任何检查。

-

这意味着,在代码片段中,由于 a 的长度是 3,编译器可能假设索引小雨 3。这使我们确保其假设成立。

+

这意味着,在代码片段中,由于 a 的长度是 3,编译器可能假设索引小于 3。这使我们确保其假设成立。

如果我们破坏了这个假设,例如,我们以等于 3 的索引运行,任何事情都可能发生。它可能导致读取 a 之后存储的任何内存内容。这可能导致程序崩溃。它可能会执行程序中完全无关的部分。它可能会引起各种糟糕的情况。

-

或许令人惊讶的是,为定义行为可能“回到过去”,导致之前的代码出问题。要理解这种情况是如何发生的,想象我们上面的片段有一个 match 语句,如下:

+

或许令人惊讶的是,未定义行为甚至可以“时间回溯”,导致之前的代码出问题。要理解这种情况是如何发生的,想象我们上面的片段有一个 match 语句,如下:

match index {
    0 => x(),
@@ -408,11 +408,11 @@ let b = unsafe { a.get_unchecked(index) };
let a = [123, 456, 789]; let b = unsafe { a.get_unchecked(index) }; -

由于不安全的代码,允许编译器假设索引只有 0、1 或 2。从逻辑上讲,我们的 match 语句的最后分支仅会匹配到 2,因此 z 仅会调用为 z(2)。这个结论不仅可以优化匹配,还可以优化 z 本身。这可以扔掉代码中未使用的部分。

+

由于不安全的代码,编译器被允许假设索引只有 0、1 或 2。它可能会逻辑的得出结论,我们的 match 语句的最后分支仅会匹配到 2,因此 z 仅会调用为 z(2)。这个结论不仅可以优化匹配,还可以优化 z 本身。这可能包括丢弃代码中未使用的部分。

-

如果我们以 3 的索引执行此设置,我们的程序试图去执行已优化的部分,导致完全地为定义行为,这还远在我们到达最后一行不安全的块之前。就像这样,未定义行为通过整个程序向后或者向前传播,通过非常意想不到的方式传播。

+

如果我们以 3 为索引执行此设置,我们的程序可能会尝试执行被优化的部分,导致在我们到达最后一行的不安全块之前就出现不可预测的行为。就像这样,未定义行为通过整个程序向后或者向前传播,而这往往是以非常出乎意料的方式发生的。

-

当调用任何的不安全函数时,读它的文档并确保你完全理解它的安全需求:作为调用者,你需要坚持的假设,以避免未定义行为。

+

当调用任何的不安全函数时,仔细阅读其文档,确保你完全理解它的安全要求:作为调用者,你需要维持的假设,以避免未定义行为。

## 内部可变性 @@ -431,7 +431,7 @@ let b = unsafe { a.get_unchecked(index) }; ### Cell -`std::cell::Cell` 仅是包裹了 T,但允许通过共享引用进行可变。为避免未定义行为,它仅允许你讲值复制出来(如果 T 实现 Copy)或者将其替换为另一个整体值。此外,它仅用于单个线程。 +`std::cell::Cell` 仅是包裹了 T,但允许通过共享引用进行可变。为避免未定义行为,它仅允许你将值复制出来(如果 T 实现 Copy)或者将其替换为另一个整体值。此外,它仅用于单个线程。 让我们看一看与上一节相似的示例,但是这一次使用 `Cell` 而不是 `i32`: @@ -462,7 +462,7 @@ fn f(v: &Cell>) { ### RefCell -与常规的 Cell 不同的是,`std::cell::RefCell` 与许你以很小的运行时花费去借用它的内容。`RefCell` 不仅持有 T,同时也持跟踪任何未解除的借用。如果你尝试在已经可变借用时尝试借用(或反之亦然),它会引发 panic,以避免出现未定义行为。就像 Cell,RefCell 只能在单个线程中使用。 +与常规的 Cell 不同的是,`std::cell::RefCell` 允许你以很小的运行时花费去借用它的内容。`RefCell` 不仅持有 T,同时也持跟踪任何未解除的借用。如果你尝试在已经可变借用时尝试借用(或反之亦然),它会引发 panic,以避免出现未定义行为。就像 Cell,RefCell 只能在单个线程中使用。 借用 RefCell 的内容通过调用 `borrow` 或者 `borrow_mut` 完成: @@ -504,7 +504,7 @@ fn f(v: &RefCell>) { ## 线程安全:Send 和 Sync -在这一章节中,我们已经看见一个不是*线程安全*的类型,这些类型仅用于一个单线程,例如 `Rc`、`Cell` 以及其它。由于需要这些限制来避免未定义行为,编译器需要为你理解和检查,因此你可以使用这些类型,而不必使用 unsafe 块。 +在这一章节中,我们已经看见一个不是*线程安全*的类型,这些类型仅用于一个单线程,例如 `Rc`、`Cell` 以及其它。由于需要这些限制来避免未定义行为,所以编译器需要理解并为你检查这个限制,这样你就可以在不使用 unsafe 块的情况下使用这些类型。 该语言使用两种特殊的 trait 以更总这些类型可以安全地用作交叉线程: @@ -560,7 +560,7 @@ fn main() { 这里,我们尝试去发送 `Rc` 到一个新线程,但是 `Rc` 与 `Arc` 不同,它没有实现 Send。 -如果我们尝试去编译以上示例,我们将面临看山区像这样的错误: +如果我们尝试去编译以上示例,我们将面临一个类似这样的错误: ```txt error[E0277]: `Rc` cannot be sent between threads safely @@ -615,7 +615,7 @@ fn main() { } ``` -在这里,我们有一个 `Mutex`,一个保护整数的 mutex,并且我们产生十个线程,每个整数增加线程 100 倍。每个线程将首先锁定 mutex 去获取 MutexGuard,并且然后使用 guard 去获取整数并修改它。当该变量超出作用域后,guard 会立即隐式 drop。 +在这里,我们有一个 `Mutex`,一个保护整数的 mutex,并且我们启动了十个线程,每个线程会增加好这个整数 100 次。每个线程将首先锁定 mutex 去获取 MutexGuard,并且然后使用 guard 去获取整数并修改它。当该变量超出作用域后,guard 会立即隐式 drop。 线程完成后,我们可以通过 `into_inner()` 安全地从整数中移除保护。`into_inner` 方法获取 mutex 的所有权,这确保了没有其它东西可以引用 mutex,从而使 mutex 变得不再必要。 @@ -776,11 +776,11 @@ fn main() { } ``` -消费线程进行一个无穷尽的循环,它将项弹出队列,使用 `dbg` 宏展示它们。当队列为空的时候,它停止并且使用 `park()` 函数进行睡眠。如果它得到解锁,`park()` 调用将返回,循环继续,再次从队列中弹出项,直到它是空的。等等。 +消费线程运行一个无限循环,它将项弹出队列,使用 `dbg` 宏展示它们。当队列为空的时候,它停止并且使用 `park()` 函数进行睡眠。如果它得到解锁,`park()` 调用将返回,循环继续,再次从队列中弹出项,直到它是空的。等等。 生产线程将其推入队列,每秒产生一个新的数字。每次增加一个项时,它都会在 Thread 对象上使用 `unpark()` 方法,该方法引用消费线程来解锁它。这样,消费线程就会被唤醒处理新的元素。 -这样,我们要作出的一个重要的观测是,如果我们移除**阻塞**,程序将在理论上正确,尽管效率低下。这是重要的,因为 `park()` 不能保证它将由于匹配 `unpark()` 而返回。尽管有些罕见,但它很可能会有*虚假唤醒*。我们的示例处理得很好,因为消费线程将锁定去咧,可以看到它是空的,然后直接解锁它并再次阻塞。 +需要注意的一点是,即使我们移除**阻塞**,这个程序在理论上仍然是正确的,尽管效率低下。这是重要的,因为 `park()` 不能保证它将由于匹配 `unpark()` 而返回。尽管有些罕见,但它很可能会有*虚假唤醒*。我们的示例处理得很好,因为消费线程会锁定队列,可以看到它是空的,然后直接解锁它并再次阻塞。 线程阻塞的一个重要属性是,在线程自己进入阻塞之前,对 `unpark()` 的调用不会丢失。对 unpark 的请求仍然被记录下来,并且下次线程尝试挂起自己的时候,它会清除该请求并且直接继续执行,实际上并不会进入睡眠状态。为了理解这对于正确操作的关键性,让我们来看一下程序可能执行步骤的顺序: @@ -793,19 +793,19 @@ fn main() { 7. P 调用 `unpark()` 去通知 C,有一些新的项。 8. C 调用 `park()` 去睡眠,以等待更多的项。 -虽然在步骤 3 解锁队列和在步骤 8 阻塞之间很可能仅有一个很短的时间,但第 4 步和第 7 步可能在线程阻塞自己之前发生。如果 `unpark()` 在线程没有挂起时不执行任何操作,那么通知将会丢失。即使队列中有项,消费线程仍然在等待。由于 unpark() 请求被保存,以供将来调用 park() 时使用,我们不必担心这个问题。 +虽然在步骤 3 解锁队列和在步骤 8 阻塞之间很可能仅有一个很短的时间,但第 4 步和第 7 步可能在线程阻塞自己之前发生。如果 `unpark()` 在线程没有挂起时不执行任何操作,那么通知将会丢失。即使队列中有项,消费线程仍然在等待。由于 `unpark()` 请求被保存,以供将来调用 `park()` 时使用,我们不必担心这个问题。 然而,unpark 请求并不会堆起来。先调用两次 `unpark()`,然后再调用两次 `park()`,线程仍然会进入睡眠状态。第一次 `park()` 清除请求并直接返回,但第二次调用通常让它进入睡眠。 这意味着,在我们上面的示例中,重要的是我们看见队列为空的时候,我们仅会阻塞线程,而不是在处理每个项之后将其阻塞。然而由于巨长的(1s)睡眠,这种情况在本示例中几乎不可能发生,但多个 `unpark()` 调用仅能唤醒单个 `park()` 调用。 -不幸的是,这确实意味着,如果在 `park()` 返回后,立即调用 `unpark()`,但是在队列得到锁定并清空之前,`unpark()` 调用是不必要的,单仍然会导致下一个 `park()` 调用立即返回。这导致(空的)队列多次被锁定并解锁。虽然这不会影响程序的正确性,但这确实会影响它的效率和性能。 +不幸的是,这确实意味着,如果在 `park()` 返回后,立即调用 `unpark()`,但是在队列得到锁定并清空之前,`unpark()` 调用是不必要的,但仍然会导致下一个 `park()` 调用立即返回。这导致(空的)队列多次被锁定并解锁。虽然这不会影响程序的正确性,但这确实会影响它的效率和性能。 这种机制在简单的情况下是好的,比如我们的示例,但是当东西变得复杂,情况可能会很糟糕。例如,如果我们有多个消费线程从相同的队列获取项时,生产线程将不会知道有哪些消费者实际上在等待以及应该被唤醒。生产者将必须知道消费者正在等待的时间以及正在等待的条件。 ### 条件变量 -条件变量是一个更通用的选贤,用于等待受 mutex 保护的数据发生变化。它有两种基本操作:等待和通知。线程可以在条件变量上等待,然后在另一个线程通知相同条件变量时被唤醒。多个线程可以在通向的条件变量上等待,通知可以发送给一个等待线程或者所有等待线程。 +条件变量是一个更通用的选项,用于等待受 mutex 保护的数据发生变化。它有两种基本操作:等待和通知。线程可以在条件变量上等待,然后在另一个线程通知相同条件变量时被唤醒。多个线程可以在同样的条件变量上等待,通知可以发送给一个等待线程或者所有等待线程。 这意味着我们可以为我们感兴趣的事件或条件创建一个条件变量,例如,队列是非空的,并且在该条件下等待。任意导致事件或条件发生的线程都会通知条件变量,无需知道那个或有多个线程对该通知感兴趣。 @@ -815,7 +815,7 @@ Rust 标准库提供了 `std::sync::Condvar` 作为条件变量。它的等待 它有两个通知方法:`notify_one` 仅唤醒一个线程(如果有),和 `notify_all` 去唤醒所有线程。 -让我们改用 Condvar 修改我们用于线程停放的示例: +让我们改用 Condvar 修改我们用于线程阻塞的示例: ```rust use std::sync::Condvar; @@ -850,11 +850,11 @@ thread::scope(|s| { * 我们必须改变一些事情: * 我们现在不仅有一个包含队列的 Mutex,同时有一个 Condvar 去通信“不为空”的条件。 * 我们不再需要知道要唤醒哪个线程,因此我们不再存储 spawn 的返回值。而是,我们通过使用 `notify_one` 方法的条件变量通知消费者。 - * 解锁、等待以及重新锁定都是通过 `wait` 方法完成的。我们不得不稍微重组控制流,以便出阿迪 guard 到 wait 方法,同时在处理项之前仍然 drop 它。 + * 解锁、等待以及重新锁定都是通过 `wait` 方法完成的。我们不得不稍微重组控制流,以便传递 guard 到 wait 方法,同时在处理项之前仍然 drop 它。 现在,我们可以根据自己的需求生成尽可能多的消费线程,甚至稍后生成更多线程,而无需更改任何东西。条件变量会负责将通知传递给任何感兴趣的线程。 -如果我们有个更加复杂的系统,其线程对不同条件感兴趣,我们可以为每个条件定义一个 `Condvar`。例如,我们能定义一个来指示队列是非空的并且另一个指示它是空的。然后,每个线程将等待与它们正在做的事情相关的条件。 +如果我们有个更加复杂的系统,其线程对不同条件感兴趣,我们可以为每个条件定义一个 `Condvar`。例如,我们能定义一个来指示队列是非空的条件,并且另一个指示队列是空的条件。然后,每个线程将等待与它们正在做的事情相关的条件。 通常,Condvar 仅能与单个 Mutex 一起使用。如果两个线程尝试使用两个不同的 mutex 去并发地等待条件变量,它可能导致 panic。 @@ -867,7 +867,7 @@ Condvar 的缺点是,它仅能与 Mutex 一起工作,对于大多数用例 * 多线程可以并发地运行在相同程序并且可以在任意时间生成。 * 当主线程结束,主程序结束。 * 数据竞争是未定义行为,它会由 Rust 的类型系统完全地组织(在安全的代码中)。 -* 常规的线程可以像程序运行一样长时间,并且因此只能借用 `'static` 数据。例如静态和泄漏分配。 +* 常规的线程可以像程序运行一样长时间,并且因此只能借用 `'static` 数据。例如静态变量和泄漏分配。 * 引用计数(Arc)可以用于共享所有权,以确保只要有一个线程使用它,数据就会存在。 * 作用域线程用于限制线程的生命周期,以允许其借用非 `'static` 数据,例如作用域变量。 * `&T` 是*共享引用*。`&mut T` 是*独占引用*。常规类型不允许通过共享引用可变。 @@ -875,7 +875,7 @@ Condvar 的缺点是,它仅能与 Mutex 一起工作,对于大多数用例 * Cell 和 RefCell 是单线程内部可变性的标准类型。Atomic、Mutex 以及 RwLock 是它们多线程等价物。 * Cell 和原子类型仅允许作为整体替换值,而 RefCell、Mutex 和 RwLock 允许你通过动态执行访问规则直接替换值。 * 线程阻塞可以是等待某种条件的便捷方式。 -* 当条件是关于由 Mutex 保护的数据时,使用 `Condvar` 时更方便的,并且比线程阻塞更有效。 +* 当条件是关于由 Mutex 保护的数据时,使用 `Condvar` 时更方便,并且比线程阻塞更有效。

下一篇,第二章:Atomic diff --git a/2_Atomics.md b/2_Atomics.md index c813682..18d0340 100644 --- a/2_Atomics.md +++ b/2_Atomics.md @@ -108,7 +108,7 @@ fn main() { } ``` -这次,我们使用一个[线程作用域](./1_Basic_of_Rust_Concurrency.md#线程作用域),它将自动地为我们处理线程的 join,并且也允许我们借用局部变量。 +这次,我们使用一个[作用域内的线程](./1_Basic_of_Rust_Concurrency.md#作用域内的线程),它将自动地为我们处理线程的 join,并且也允许我们借用局部变量。 每次后台线程完成处理项时,它都会将处理的项目数量存储在 AtomicUsize 中。与此同时,主线程向用户显示该数字,告知该进度,大约每秒一次。一旦主线程看见所有 10 项已经被处理,它就会退出作用域,它会隐式地 join 后台线程,并且告知用户所有都完成。 diff --git a/3_Memory_Ordering.md b/3_Memory_Ordering.md index eb78b23..d0dfc13 100644 --- a/3_Memory_Ordering.md +++ b/3_Memory_Ordering.md @@ -278,17 +278,17 @@ fn main() {

想象一下,两个线程都将一个 7 release-store 到相同的原子变量中,第三个线程从该变量中加载 7。第三个线程和第一个或者第二个线程有一个 happens-before 关系吗?这取决于它加载“哪个 7”:线程一还是线程二的。(或许一个不相关的 7)。这使我们得出的结论是,尽管 7 等于 7,但两个 7 与两个线程有一些不同。

-

思考这个问题的方式是我们在[“Relaxed 排序”](#relaxed-排序)中讨论的*总修改顺序*:发生在原子变量上的所有修改的有序列表。即使将相同的值多次写入相同的变量,这些操作中的每一个都以该变量的总修改顺序代表一个单独的事件。当我们加载一个值,加载的值与每个变量“时间线”上的特定点相匹配,这告诉我们我们可能会同步哪个操作。

+

思考这个问题的方式是我们在“Relaxed 排序”中讨论的*总修改顺序*:发生在原子变量上的所有修改的有序列表。即使将相同的值多次写入相同的变量,这些操作中的每一个都以该变量的总修改顺序代表一个单独的事件。当我们加载一个值,加载的值与每个变量“时间线”上的特定点相匹配,这告诉我们我们可能会同步哪个操作。

例如,如果原子总修改顺序是

- 1. 初始化为 0 +

1. 初始化为 0

- 2. Release-store 7(来自线程二) +

2. Release-store 7(来自线程二)

- 3. Release-store 6 +

3. Release-store 6

- 4. Release-store 7(来自线程一) +

4. Release-store 7(来自线程一)

然后,acquire-load 7 将与第二个线程的 release-store 或者最后一个事件的 release-store 同步。然而,如果我们之前(就 happens-before 关系而言)见过 6,我们知道我们看到的是最后一个 7,而不是第一个 7,这意味着我们现在与线程一有 happens-before 的关系,而不是线程二。

@@ -296,17 +296,17 @@ fn main() {

例如,想象一个具有以下总修改顺序的原子变量:

- 1. 初始化为 0 +

1. 初始化为 0

- 2. Release-store 7 +

2. Release-store 7

- 3. Relaxed-fetch-and-add 1,改变 7 到 8 +

3. Relaxed-fetch-and-add 1,改变 7 到 8

- 4. Relaxed-fetch-and-add 1,改变 8 到 9 +

4. Relaxed-fetch-and-add 1,改变 8 到 9

- 5. Release-store 7 +

5. Release-store 7

- 6. Relaxed-swap 10,改变 7 到 10 +

6. Relaxed-swap 10,改变 7 到 10

现在,如果我们在这个变量上执行 acquire-load 到 9,我们不仅与第四个操作(存储此值)建立了一个 happens-before 关系,同时也与第二个操作(存储 7)建立了该关系,即使第三个操作使用了 Relaxed 内存排序。

diff --git a/5_Building_Our_Own_Channels.md b/5_Building_Our_Own_Channels.md index 99cd244..8d1ee8e 100644 --- a/5_Building_Our_Own_Channels.md +++ b/5_Building_Our_Own_Channels.md @@ -1,19 +1,739 @@ # 第五章:构建我们自己的 Channel +*Channel* 可以被用于在线程之间发送数据,并且它们有很多变体。一些 channel 仅能在一个发送者和一个接收者之间使用,而另一些可以在任意数量的线程之间发送,或者甚至允许多个接收者。一些 channel 是阻塞的,这意味着接收(有时也包括发送)是一个阻塞操作,这会使线程进入睡眠状态,知道你的操作完成。一些 channel 针对团兔粮进行优化,而另一些针对低延迟进行优化。 + +这些变体是无穷尽的,没有一种通用版本在所有场景都适合的。 + +在该章节,我们将实现一个相对简单的 channel,不仅可以探索更多的原子应用,同时也可以了解如何在 Rust 类型系统中捕获我们的需求和假设。 + ## 一个简单的以 mutex 为基础的 Channel +一个基础的 channel 实现并不需要任何关于原子的知识。我们可以采用 `VecDeque`,它根本上是一个 `Vec`,允许在两端高效的添加和移除元素,并使用 Mutex 保护它,以允许多个线程访问。然后,我们使用 `VecDeque` 作为已发送但尚未接受数据的消息队列。任何想要发送消息的线程只需要将其添加到队列的末尾,而任何想要接受消息的线程只需从队列的前端删除一个消息。 + +还有一件事需要补充,用于使接收操作阻塞:Condvar(参见[第一章“条件变量”](./1_Basic_of_Rust_Concurrency.md#条件变量)),以通知等待接收者新的消息。 + +这样做的实现可能非常简短且相对简单,如下所示: + +```rust +pub struct Channel { + queue: Mutex>, + item_ready: Condvar, +} + +impl Channel { + pub fn new() -> Self { + Self { + queue: Mutex::new(VecDeque::new()), + item_ready: Condvar::new(), + } + } + + pub fn send(&self, message: T) { + self.queue.lock().unwrap().push_back(message); + self.item_ready.notify_one(); + } + + pub fn receive(&self) -> T { + let mut b = self.queue.lock().unwrap(); + loop { + if let Some(message) = b.pop_front() { + return message; + } + b = self.item_ready.wait(b).unwrap(); + } + } +} +``` + +注意,我们并没有使用任意的原子操作或者不安全代码,也不需要考虑 `Send` 或者 `Sync`。编译器理解 Mutex 的接口以及保证该提供什么类型,并且会隐式地理解如果 `Mutex` 和 Condvar 都可以在线程之间安全共享,那么我们的 `Channel` 也可以。 + +我们的 `send` 函数锁定 mutex,将新消息推入队列的末尾,并且使用条件变量在解锁队列后直接通知可能等待的接收者。 + +`receive` 函数也锁定 mutex,然后从队列的首部弹出消息,但如果仍然没有可获得的消息,则会使用条件变量去等待。 + +> 记住,`Condvar::wait` 方法将在等待时解锁 Mutex,并在返回之前重新锁定它。因此,我们的 `receive` 函数将不会在等待时锁定 mutex。 + +尽管这个 channel 在使用上是非常灵活的,因为它允许任意数量的发送和接收线程,它的实现在很多情况下远非最佳。即使有大量的消息准备好被接收,任意的发送或者接收操作将短暂地阻塞任意其它的发送或者接收操作,因为它们必须都锁定相同的 mutex。如果 `VecDeque::push` 不得不增加 VecDeque 的容量,所有的发送和接收线程将不得不等待该线程完成重新分配,这在某些情况下是不可接受的。 + +另一个可能不可取的属性是,该 channel 的队列可能会无限制地增长。没有什么能阻止发送者以比接收者更高的速度持续发送新消息。 + ## 一个不安全的一次性 Channel +channel 的各种用例几乎是无止尽的。然而,在本章的剩余部分,我们将专注于一种特定类型的用例:恰好从一个线程向另一个线程发送一条消息。为此类用例设计的 channel 通常被称为 *一次性*(one-shot)channel。 + +我们采用上述基于 `Mutex` 的实现,并且将 `VecDeque` 替换为 `Option`,从而将队列的容量减小到恰好一个消息。这样可以避免内存分配,但是仍然会有使用 Mutex 的一些缺点。我们可以通过使用原子操作从头构建我们自己的一次性 channel 来避免这个问题。 + +首先,让我们构建一个最小化的一次性 channel 实现,不需要考虑它的接口。在本章的稍后,我们将探索如何改进其接口以及如何与 Rust 类型相结合,为 channel 的用于提供愉快的体验。 + +我们需要开始的工具基本上与我们在[第四章](./4_Building_Our_Own_Spin_Lock.md)使用的 `SpinLock` 基本相同:一个用于存储的 `UnsafeCell` 和用于指示状态的 `AtomicBool`。在该示例中,我们使用原子布尔值去指示消息是否准备好用于消费。 + +在发送消息之前,channel 是“空的”并且不包含任意类型为 T 的消息。我们可以在 cell 中使用 `Option`,以允许 T 缺失。然而,这可能会浪费宝贵的内存空间,因为我们的原子布尔值已经告诉我们是否有消息。相反,我们可以使用 `std::mem::MaybeUninit`,它本质上是裸露的 `Option` 的不安全版本:它要求用户手动跟踪其是否已初始化,几乎整个接口都是不安全的,因为它不能执行自己的检查。 + +综合来看,我们从这个结构体定义开始我们的第一次尝试: + +```rust +use std::mem::MaybeUninit; + +pub struct Channel { + message: UnsafeCell>, + ready: AtomicBool, +} +``` + +就像我们的 `SpinLock` 一样,我们需要告诉编译器,我们的 channel 在线程之间共享是安全的,或者至少只要 T 是 `Send` 的: + +```rust +unsafe impl Sync for Channel where T: Send {} +``` + +一个新的 channel 是空的,将 `ready` 设置为 false,并且消息仍然没有初始化: + +```rust +impl Channel { + pub const fn new() -> Self { + Self { + message: UnsafeCell::new(MaybeUninit::uninit()), + ready: AtomicBool::new(false), + } + } + + // … +} +``` + +要发送消息,它首先需要存储在 cell 中,之后我们可以通过将 ready 标志设置为 true 来将其释放给接收者。试图做这个超过一次是危险的,因为设置 ready 标志后,接收者可能在任意时刻读取消息,这可能会与第二次发送消息产生数据竞争。目前,我们通过使方法不安全并为它们留下备注,将此作为用户的责任: + +```rust + /// Safety: Only call this once! + pub unsafe fn send(&self, message: T) { + (*self.message.get()).write(message); + self.ready.store(true, Release); + } +``` + +在上面这个片段中,我们使用 `UnsafeCell::get` 方法去获取指向 `MaybeUninit` 的指针,并且通过不安全地解引用它来调用 `MaybeUninit::write` 进行初始化。当错误使用时,这可能导致未定义行为,但我们将这个责任注意到了调用方身上。 + +对于内存排序,我们需要使用 release 排序,因为原子的存储有效地将消息释放给接收者。这确保了如果接收线程从 `self.ready` 以 acquire 排序加载 true,则消息的初始化将从接受线程的角度完成。 + +对于接收,我们暂时不会提供阻塞的接口。相反,我们将提供两个方法:一个用于检查是否有可用消息,另一个用于接收消息。我们将让我们的 channel 用户决定是否使用[线程阻塞](./1_Basic_of_Rust_Concurrency.md#线程阻塞)的方法来阻塞。 + +以下是完成此版本我们 channel 的最后两种方法: + +```rust + pub fn is_ready(&self) -> bool { + self.ready.load(Acquire) + } + + /// Safety: Only call this once, + /// and only after is_ready() returns true! + pub unsafe fn receive(&self) -> T { + (*self.message.get()).assume_init_read() + } +``` + +虽然 `is_ready` 方法可以始终地安全调用,但是 `receive` 方法使用了 `MaybeUninit::assume_init_read()`,这不安全地假设它已经被初始化,且不会用于生成非 `Copy` 对象的多个副本。就像 `send` 方法一样,我们只需通过将函数本身标记为不安全来将这个问题交给用户解决。 + +结果是一个在技术上可用的 channel,但它用起来不便并且通常令人失望。如果正确使用,它会按预期进行操作,但有很多微妙的方式去错误地使用它。 + +多次调用 send 可能会导致数据竞争,因为第二个发送者在接收者尝试读取第一条消息时可能正在覆盖数据。即使接收操作得到了正确的同步,从多个线程调用 send 可能会导致两个线程尝试同时写入 cell,再次导致数据竞争。此外,多次调用 `receive` 会导致获取两个消息的副本,即使 T 不实现 `Copy` 并且因此不能安全地进行复制。 + +更微妙的问题是我们的通道缺乏 `Drop` 实现。`MaybeUninit` 类型不会跟踪它是否已经初始化,因此在被 drop 时不会自动 drop 其内容。这意味着如果发送了一条消息但从未被接收,该消息将永远不会被 drop。这不是不正确的,但仍然是要避免。在 Rust 中,泄漏被普遍认为是安全的,但通常只有作为另一个泄漏的结果才是可接受的。例如,泄漏 Vec 也会泄漏其内容,但正常使用 Vec 不会导致任何泄漏。 + +由于我们让用户对一切负责,不幸的事故只是时间问题。 + ## 通过运行时检查来达到安全 +为了提供更安全的接口,我们可以增加一些检查,以确保误用会导致 panic 并显示清晰的错误信息,这比未定义行为要好得多。 + +让我们在消息准备好之前调用 `receive` 方法的问题开始处理。这个问题很容易解决,我们只需要在尝试读消息之前让 receive 方法验证 ready 标识即可: + +```rust + /// Panics if no message is available yet. + /// + /// Tip: Use `is_ready` to check first. + /// + /// Safety: Only call this once! + pub unsafe fn receive(&self) -> T { + if !self.ready.load(Acquire) { + panic!("no message available!"); + } + (*self.message.get()).assume_init_read() + } +``` + +该函数仍然是不安全的,因为用户仍然需要确保只调用一次,但未能首先检查 `is_ready()` 不再导致未定义行为。 + +因为我们现在在 `receive` 方法里有一个 `ready` 标识的 acquire-load 操作,其提供了必要的同步,我们可以在 `is_ready` 中使用 Relaxed 内存排序,因为该操作现在仅用于指示目的: + +```rust + pub fn is_ready(&self) -> bool { + self.ready.load(Relaxed) + } +``` + +> 记住,ready 上的总修改顺序(参见[第三章的“Relaxed 排序”](./3_Memory_Ordering.md#relaxed-排序))保证了从 `is_ready` 加载 true 之后,receive 也能看到 true。无论 is_ready 使用的内存排序如何,都不会出现 `is_ready` 返回 true,`receive()` 仍然出现 panic 的情况。 + +下一个要解决的问题是,当调用 receive 不止一次时会发生什么。通过在接收者法中将 `ready` 标志设置回 false,我们也可以很容易地导致 panic,例如: + +```rust + /// Panics if no message is available yet, + /// or if the message was already consumed. + /// + /// Tip: Use `is_ready` to check first. + pub fn receive(&self) -> T { + if !self.ready.swap(false, Acquire) { + panic!("no message available!"); + } + // Safety: We've just checked (and reset) the ready flag. + unsafe { (*self.message.get()).assume_init_read() } + } +``` + +我们仅是将 load 操作更改为 swap 操作(交换的值为 `false`),突然之间,receive 方法在任何情况下都可以安全地调用。该函数不再标记为不安全。我们现在承担了不安全代码的责任,而不是让用户负责一切,从而减轻了用户的压力。 + +对于 send,事情稍微复杂一点。为了阻止多个 send 调用同时访问 cell,我们需要知道是否另一个 send 调用已经开始。ready 标识仅告诉我们是否另一个 send 调用已经完成,所以这还不够。 + +让我们增加第二个标识,命名为 `in_use`,以指示该 channel 是否已经在使用: + +```rust +pub struct Channel { + message: UnsafeCell>, + in_use: AtomicBool, // New! + ready: AtomicBool, +} + +impl Channel { + pub const fn new() -> Self { + Self { + message: UnsafeCell::new(MaybeUninit::uninit()), + in_use: AtomicBool::new(false), // New! + ready: AtomicBool::new(false), + } + } + + //… +} +``` + +现在我们需要做的就是在访问 cell 之前,在 send 方法中,将 `in_use` 设置为 true,如果它已经由另一个线程设置,则 panic: + +```rust + /// Panics when trying to send more than one message. + pub fn send(&self, message: T) { + if self.in_use.swap(true, Relaxed) { + panic!("can't send more than one message!"); + } + unsafe { (*self.message.get()).write(message) }; + self.ready.store(true, Release); + } +``` + +我们可以为原子 swap 操作使用 relaxed 内存排序,因为 `in_use` 的*总修改顺序*(参见[第三章“Relaxed 排序”](./3_Memory_Ordering.md#relaxed-排序))保证了在 in_use 上只会有一个 swap 操作返回的 false,而这是 send 方法尝试访问 cell 的唯一情况。 + +现在我们拥有了一个完全安全的接口,尽管还有一个问题未解决。最后一个问题出现在发送一个永远不会被接收的消息时:它将从不会被 drop。虽然这不会导致未定义行为,并且在安全代码中是允许的,但确实应该避免这种情况。 + +由于我们在 receive 方法中重置了 ready 标志,修复这个问题很容易:ready 标志指示是否在 cell 中尚未接受的消息需要被 drop。 + +在我们的 Channel 的 Drop 实现中,我们不需要使用一个原子操作去检查原子 ready 标识,因为只有在在完全由正在 drop 的线程拥有所有权,且没有任何未解除借用的情况下,才能 drop 一个对象。这意味着,我们可以使用 `AtomicBool::get_mut` 方法,它接受一个独占引用(`&mut self`),以证明原子访问是不必要的。对于 UnsafeCell 也是一样,通过 `UnsafeCell::get_mut` 方法来来获取独占引用。 + +使用它,这是我们完全安全且不泄漏的 channel 的最后一部分: + +```rust +impl Drop for Channel { + fn drop(&mut self) { + if *self.ready.get_mut() { + unsafe { self.message.get_mut().assume_init_drop() } + } + } +} +``` + +我们试试吧! + +由于我们的 channel 仍没有提供一个阻塞的接口,我们将手动地使用线程阻塞去等待消息。只要没有消息准备好,接收线程将 `park()` 自身,并且发送线程将在发送东西后,立刻 `unpark()` 接收者。 + +这里是一个完整的测试程序,通过我们的 `Channel` 从第二个线程发送字符串字面量“hello world”到主线程: + +```rust +fn main() { + let channel = Channel::new(); + let t = thread::current(); + thread::scope(|s| { + s.spawn(|| { + channel.send("hello world!"); + t.unpark(); + }); + while !channel.is_ready() { + thread::park(); + } + assert_eq!(channel.receive(), "hello world!"); + }); +} +``` + +该程序编译、运行和干净地退出,表明我们的 Channel 正常工作。 + +如果我们复制了 send 行,我们也可以在运行中看到我们的安全检查,当运行程序时,产生以下 panic: + +```txt +thread '' panicked at 'can't send more than one message!', src/main.rs +``` + +尽管 panic 程序并不出色,但是程序可靠的 panic 比坑的未定义行为错误好太多。 + +
+

为 Channel 状态使用单原子

+ +

如果你对 channel 实现还不满意,这里有一个微妙的变体,可以节省一字节的内存。

+ +

我们使用单个原子 AtomicU8 表示所有 4 个状态,而不是使用两个分开的布尔值去表示 channel 的状态。我们必须使用 compare_exchange 来原子地检查 channel 是否处于预期状态,并将其更改为另一个状态,而不是原子交换布尔值。

+ +
const EMPTY: u8 = 0;
+const WRITING: u8 = 1;
+const READY: u8 = 2;
+const READING: u8 = 3;
+
+pub struct Channel {
+    message: UnsafeCell>,
+    state: AtomicU8,
+}
+
+unsafe impl Sync for Channel {}
+
+impl Channel {
+    pub const fn new() -> Self {
+        Self {
+            message: UnsafeCell::new(MaybeUninit::uninit()),
+            state: AtomicU8::new(EMPTY),
+        }
+    }
+
+    pub fn send(&self, message: T) {
+        if self.state.compare_exchange(
+            EMPTY, WRITING, Relaxed, Relaxed
+        ).is_err() {
+            panic!("can't send more than one message!");
+        }
+        unsafe { (*self.message.get()).write(message) };
+        self.state.store(READY, Release);
+    }
+
+    pub fn is_ready(&self) -> bool {
+        self.state.load(Relaxed) == READY
+    }
+
+    pub fn receive(&self) -> T {
+        if self.state.compare_exchange(
+            READY, READING, Acquire, Relaxed
+        ).is_err() {
+            panic!("no message available!");
+        }
+        unsafe { (*self.message.get()).assume_init_read() }
+    }
+}
+
+impl Drop for Channel {
+    fn drop(&mut self) {
+        if *self.state.get_mut() == READY {
+            unsafe { self.message.get_mut().assume_init_drop() }
+        }
+    }
+}
+ +
+ ## 通过类型来达到安全 +尽管我们已经成功地保护了我们 Channel 的用户免受未定义行为的问题,但是如果它们偶尔地不正确使用它,它们仍然有 panic 的风险。理想情况下,编译器将在程序运行之前检查正确的用法并指出滥用。 + +让我们来看看调用 send 或 receive 不止一次的问题。 + +为了防止函数被多次调用,我们可以让它*按值*接受参数,对于非 `Copy` 类型,这将消耗对象。对象被消耗或移动后,它会从调用者那里消失,防止它再次被使用。 + +通过将调用 send 或 receive 表示的能力作为单独的(非 `Copy`)类型,并在执行操作时消费对象,我们可以确保每个操作只能发生一次。 + +这给我们带来了以下接口设计,而不是单个 `Channel` 类型,一个 channel 由一对 `Sender` 和 `Receiver` 表示,它们各自都有以值接收 `self` 的方法: + +```rust +pub fn channel() -> (Sender, Receiver) { … } + +pub struct Sender { … } +pub struct Receiver { … } + +impl Sender { + pub fn send(self, message: T) { … } +} + +impl Receiver { + pub fn is_ready(&self) -> bool { … } + pub fn receive(self) -> T { … } +} +``` + +用户可以通过调用 `channel()` 创建一个 channel,这将给他们一个 Sender 和一个 Receiver。它们可以自由地传递每个对象,将它们移动到另一个线程,等等。然而,它们最终不能获得其中任何一个的多个副本,这保证了 send 和 receive 仅被调用一次。 + +为了实现这一点,我们需要为我们的 UnsafeCell 和 AtomicBool 找到一个位置。之前,我们仅有一个具有这些字段的结构体,但是现在我们有两个单独的结构体,每个结构体都可能存在更长的时间。 + +因为 sender 和 receiver 将需要共享这些变量的所有权,我们将使用 Arc([第一章“引用计数”](./1_Basic_of_Rust_Concurrency.md#引用计数))为我们提供引用计数的共享分配,我们将在其中存储共享的 Channel 对象。正如以下展示的,Channel 类型不必是公共的,因为它的存在是与用户无关的细节。 + +```rust +pub struct Sender { + channel: Arc>, +} + +pub struct Receiver { + channel: Arc>, +} + +struct Channel { // no longer `pub` + message: UnsafeCell>, + ready: AtomicBool, +} + +unsafe impl Sync for Channel where T: Send {} +``` + +就像之前一样,我们在 T 是 Send 的情况下为 `Channel` 实现了 `Sync`,以允许它跨线程使用。 + +注意,我们不再像我们之前 channel 实现中的那样,需要 `in_use` 原子布尔值。它仅通过 send 来检查它有没有被调用超过一次,现在通过类型系统静态地保证。 + +channel 函数去创建一个 channel 和一对 sender-receiver,它与我们之前的 `Channel::new` 函数类似,除了 Channel 包裹了一个 Arc,并将该 Arc 和其克隆包装在 Sender 和 Receiver 类型中: + +```rust +pub fn channel() -> (Sender, Receiver) { + let a = Arc::new(Channel { + message: UnsafeCell::new(MaybeUninit::uninit()), + ready: AtomicBool::new(false), + }); + (Sender { channel: a.clone() }, Receiver { channel: a }) +} +``` + +`send`、`is_ready` 和 `receive` 方法与我们之前实现的方法基本相同,但有一些区别: + +* 它们现在被移动到它们各自的类型中,因此只有(单个)发送者可以发送,并且只有(单个)接收者可以接收。 +* 发送和接收现在通过值而不是引用来接收 `self`,以确保它们每个只能被调用一次。 +* 发送不再 panic,因为它的先决条件(只被调用一次)现在被静态保证。 + +所以,他们现在看起来像这样: + +```rust +impl Sender { + /// This never panics. :) + pub fn send(self, message: T) { + unsafe { (*self.channel.message.get()).write(message) }; + self.channel.ready.store(true, Release); + } +} + +impl Receiver { + pub fn is_ready(&self) -> bool { + self.channel.ready.load(Relaxed) + } + + pub fn receive(self) -> T { + if !self.channel.ready.swap(false, Acquire) { + panic!("no message available!"); + } + unsafe { (*self.channel.message.get()).assume_init_read() } + } +} +``` + +receive 函数仍然可以 panic,因为用户可能仍然会在 `is_ready()` 返回 `true` 之前调用它。它仍然使用 `swap` 将 ready 标志设置回 false(而不仅仅是 load 操作),以便 Channel 的 Drop 实现知道是否有需要删除的未读消息。 + +该 Drop 实现与我们之前实现的完全相同: + +```rust +impl Drop for Channel { + fn drop(&mut self) { + if *self.ready.get_mut() { + unsafe { self.message.get_mut().assume_init_drop() } + } + } +} +``` + +当 `Sender` 或者 `Receiver` 被 drop 时,`Arc>` 的 Drop 实现将减少分配的引用计数。当 drop 到第二个时,计数达到 0,并且 `Channel` 自身被 drop。这将调用我们上面的 Drop 实现,如果已发送但未收到消息,我们将 drop 该消息。 + +让我们尝试它: + +```rust +fn main() { + thread::scope(|s| { + let (sender, receiver) = channel(); + let t = thread::current(); + s.spawn(move || { + sender.send("hello world!"); + t.unpark(); + }); + while !receiver.is_ready() { + thread::park(); + } + assert_eq!(receiver.receive(), "hello world!"); + }); +} +``` + +有一点不方便的是,我们仍然得手动地使用线程阻塞去等待一个消息,但是我们稍后将处理这个问题。 + +目前,我们的目标是在编译时使至少一种形式的滥用变得不可能。与过去不同,试图发送两次不会导致程序 Panic,相反,根本不会导致有效的程序。如果我们向上述工作程序增加另一个 send 调用,编译器现在捕捉问题并可能告知我们错误信息: + +```txt +error[E0382]: use of moved value: `sender` + --> src/main.rs + | + | sender.send("hello world!"); + | -------------------- + | `sender` moved due to this method call + | + | sender.send("second message"); + | ^^^^^^ value used here after move + | +note: this function takes ownership of the receiver `self`, which moves `sender` + --> src/lib.rs + | + | pub fn send(self, message: T) { + | ^^^^ + = note: move occurs because `sender` has type `Sender<&str>`, + which does not implement the `Copy` trait +``` + +根据情况,设计一个在编译时捕捉错误的接口可能非常棘手。如果这种情况确实适合这样的接口,它不仅可以为用户带来更多的便利,还可以减少运行时检查的数量,因为这些检查在静态上已经得到保证。例如,我们不再需要 `in_use` 标志,并从发送者法中移除了交换和检查步骤。 + +不幸的是,可能会出现新的问题,这可能导致运行时开销。在这种情况下,问题是拆分所有权,我们不得不使用 Arc 并承受 Arc 的代价。 + +不得不在安全性、便利性、灵活性、简单性和性能之间进行权衡是不幸的,但有时是不可避免的。Rust通常致力于在这些方面取得最佳表现,但有时为了最大化某个方面的优势,我们需要在其中做出一些妥协。 + ## 借用以避免分配 +我们刚刚基于 Arc 的 channel 实现的设计可以非常方便的使用——代价是一些性能,因为它得分配内存。如果我们想要优化效率,我们可以通过用户对共享的 Channel 对象负责来获取一些性能。我们可以强制用户去创建一个通过可以由 Sender 和 Receiver 借用的 Channel,而不是在幕后处理 Channel 的分配和所有权。这样,它们可以选择简单地放置 Channel 在局部变量中,从而避免分配内存的开销。 + +我们将也在一定程度上牺牲简洁性,因为我们现在不得不处理借用和生命周期。 + +因此,这三种类型现在看起来如下,Channel 再次公开,Sender 和 Receiver 借用它一段时间。 + +```rust +pub struct Channel { + message: UnsafeCell>, + ready: AtomicBool, +} + +unsafe impl Sync for Channel where T: Send {} + +pub struct Sender<'a, T> { + channel: &'a Channel, +} + +pub struct Receiver<'a, T> { + channel: &'a Channel, +} +``` + +我们没有使用 `channel()` 函数来创建一对 Sender 和 Receiver,而是回到本章节使用的 `Channel::new`,这允许用户为此类对象创建局部变量。 + +此外,我们需要一种方法,让用户创建将借用 Channel 的 Sender 和 Receiver 对象。这将需要是一个独占借用(`&mut Channel`),以确保同一 channel 不能有多个发送者或接收者。通过同时提供 Sender 和 Receiver,我们可以将独占引用*分成*两个共享借用,这样发送者和接收者都可以引用 channel,同时防止其他任何东西接触 channel。 + +这导致我们实现以下内容: + +```rust +impl Channel { + pub const fn new() -> Self { + Self { + message: UnsafeCell::new(MaybeUninit::uninit()), + ready: AtomicBool::new(false), + } + } + + pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) { + *self = Self::new(); + (Sender { channel: self }, Receiver { channel: self }) + } +} +``` + +`split` 方法使用一个极其复杂的签名,值得好好观察。它通过一个独占引用独占地借用 `self`,但它分成了两个共享引用,包装在 Sender 和 Receiver 类型中。`'a` 生命周期清楚地表明,这两个对象借用了有限的生命周期的东西;在这种情况下,是 Channel 本身的生命周期。由于 Channel 是独占地借用,只要 Sender 或 Receiver 对象存在,调用者不能去借用或者移动它。 + +然而,一旦这些对象都不再存在,可变的借用就会过期,编译器会愉快地让 Channel 对象通过第二次调用 `split()` 再次被借用。尽管我们可以假设在 Sender 和 Receiver 存在时,不能再次调用 `split()`,我们不能阻止在这些对象被 drop 或者遗忘后再次调用 `split()`。我们需要确保我们不能偶然地在 channel 已经有它的 ready 标志设置的情况下创建新的 Sender 或 Receiver 对象,因为这将打包阻止未定义行为的假设。 + +通过在 `split()` 中用新的空 channel 覆盖 `*self`,我们确保它在创建 Sender 和 Receiver 状态时处于预期状态。这也会在旧的 `*self` 上调用 Drop 实现,它将负责 drop 之前发送但从未接收的消息。 + +> 由于 split 的签名的生命周期来自 `self`,它可以被省略。上面片段的 `split` 签名与这个不太冗长的版本相同 +> +> ```rust +> pub fn split(&mut self) -> (Sender, Receiver) { … } +> ``` +> +> 虽然此版本没有明确显示返回的对象借用了 self,但编译器仍然与更冗长的版本完全一样检查生命周期的正确使用情况。 + +其余的方法和 Drop 实现与我们基于 Arc 的实现相同,除了 Sender 和 Receiver 类型的额外 `'_` 生命周期参数。(如果你忘记了这些,编译器会建议添加它们。) + +为了完全起效,以下是剩余的代码: + +```rust +impl Sender<'_, T> { + pub fn send(self, message: T) { + unsafe { (*self.channel.message.get()).write(message) }; + self.channel.ready.store(true, Release); + } +} + +impl Receiver<'_, T> { + pub fn is_ready(&self) -> bool { + self.channel.ready.load(Relaxed) + } + + pub fn receive(self) -> T { + if !self.channel.ready.swap(false, Acquire) { + panic!("no message available!"); + } + unsafe { (*self.channel.message.get()).assume_init_read() } + } +} + +impl Drop for Channel { + fn drop(&mut self) { + if *self.ready.get_mut() { + unsafe { self.message.get_mut().assume_init_drop() } + } + } +} +``` + +让我们来测试它! + +```rust +fn main() { + let mut channel = Channel::new(); + thread::scope(|s| { + let (sender, receiver) = channel.split(); + let t = thread::current(); + s.spawn(move || { + sender.send("hello world!"); + t.unpark(); + }); + while !receiver.is_ready() { + thread::park(); + } + assert_eq!(receiver.receive(), "hello world!"); + }); +} +``` + +与基于 Arc 的版本相比,便利性的减少非常小:我们只需要多一行代码来手动创建一个 Channel 对象。然而,请注意,channel 必须在作用域之前创建,以向编译器证明其存在超过 Sender 和 Receiver 的时间。 + +要查看编译器的借用检查器的实际操作,请尝试在各个地方添加对 `channel.split()` 的第二次调用。你将看到,在线程作用域内第二次调用它会导致错误,而在作用域之后调用它是可以接受的。即使在作用域之前调用 `split()` 也没问题,只要你在作用域开始之前停止使用返回的 Sender 和 Receiver 。 + ## 阻塞 +让我们最终处理一下我们 Channel 最后留下的最大不便,阻塞接口的缺乏。我们测试一个新的 channel 变体,每次都使用线程阻塞函数。将这种模式本身整合到 channel 应该不是太难。 + +为了能够释放接收者,发送者需要知道去释放哪个线程。`std::thread::Thread` 类型表示线程的句柄,正是我们调用 `unpark()` 所需要的。我们将把句柄存储到 Sender 对象内的接收线程,如下所示: + +```rust +use std::thread::Thread; + +pub struct Sender<'a, T> { + channel: &'a Channel, + receiving_thread: Thread, // New! +} +``` + +然而,如果 Receiver 对象在线程之间发送,该句柄将引用错误的线程。Sender 将不会意识到这个,并且仍然会参考最初持有 Receiver 的线程。 + +我们可以通过使 Receiver 更具限制性,不再允许它在线程之间发送来处理这个问题。正如[第1章“线程安全:Send 和 Sync”](./1_Basic_of_Rust_Concurrency.md#线程安全send-和-sync)中所讨论的,我们可以使用特殊的 `PhantomData` 标记类型将此限制添加到我们的结构中。`PhantomData<*const ()>` 将完成这项工作,因为原始指针,如 `*const ()`,不实现发送: + +```rust +pub struct Receiver<'a, T> { + channel: &'a Channel, + _no_send: PhantomData<*const ()>, // New! +} +``` + +接下来,我们必须修改 `Channel::split` 方法来填充新字段,例如: + +```rust + pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) { + *self = Self::new(); + ( + Sender { + channel: self, + receiving_thread: thread::current(), // New! + }, + Receiver { + channel: self, + _no_send: PhantomData, // New! + } + ) + } +``` + +我们使用当前线程的句柄来填充 `receiving_thread` 字段,因为我们返回的 Receiver 对象将保留在当前线程上。 + +正如以下展示的,`send` 方法并不做改变。我们仅在 `receiving_thread` 字段上调用 `unpark()` 去唤醒接收者,以防止它正在等待: + +```rust +impl Sender<'_, T> { + pub fn send(self, message: T) { + unsafe { (*self.channel.message.get()).write(message) }; + self.channel.ready.store(true, Release); + self.receiving_thread.unpark(); // New! + } +} +``` + +receive 函数发生的变化稍大。如果它仍然没有消息,新版本不会 panic,而是使用 `thread::park()` 等待消息并再次尝试,并根据需要多次重试。 + +```rust +impl Receiver<'_, T> { + pub fn receive(self) -> T { + while !self.channel.ready.swap(false, Acquire) { + thread::park(); + } + unsafe { (*self.channel.message.get()).assume_init_read() } + } +} +``` + +> 请记住,`thread::park()` 可能会虚假返回。(或者因为除了我们的 send 方法以外的其它原因调用了 `unpark()`。)这意味着我们不能假设 `park()` 返回时已经设置了 ready 标志。因此,我们需要使用一个循环,在唤醒后再次检查 ready 标识。 + +`Channel` 结构体、它的 Sync 实现、它的心函数以及它的 Drop 实现保持不变。 + +让我们尝试它! + +```rust +fn main() { + let mut channel = Channel::new(); + thread::scope(|s| { + let (sender, receiver) = channel.split(); + s.spawn(move || { + sender.send("hello world!"); + }); + assert_eq!(receiver.receive(), "hello world!"); + }); +} +``` + +显然,这个 Channel 比上一个 Channel 更方便使用,至少在这个简单的测试程序中是这样。我们不得不牺牲一些灵活性来创造这种便利性:只有调用 `split()` 的线程才能调用 `receive()`。如果你交换 send 和 receive 行,此程序将不再编译。根据用例,这可能完全没问题、有用或非常不方便。 + +确实,有许多方法解决这个问题,其中有很多胡增加一些额外的复杂度并影响一些性能。总的来说,我们可以继续探索的变种和权衡是无穷无尽的。 + +我们很容易花费大量的时间实现 20 个一次性 channel 不同的变体,每个变体都具有不同的属性,适用于每个可以想象到的用例甚至更多。尽管这听起来很有趣,但是我们应该避免陷入这个歧途,并在事情失控之前结束本章。 + ## 总结 +* *channel* 用于在线程之间发送*消息*。 +* 一个简单、灵活但可能效率低下的 channel,只需一个 `Mutex` 和 `Condvar` 就很容易实现。 +* *一次性*(one-shot)channel 是一个被设计仅发送一次信息的 channel。 +* `MaybeUninit` 类型可用于表示可能尚未初始化的 `T`。其接口大多不安全,使用户负责跟踪其是否已初始化,不要复制非 `Copy` 数据,并在必要时删除其内容。 +* 不 drop 对象(也称为泄漏或者遗忘)是安全的,但如果没有充分理由而这样做,会被视为不良的做法。 +* panic 是创建安全接口的重要工具。 +* 按值获取一个非 Copy 对象可以用于阻止某个操作被重复执行。 +* 独占借用和拆分借用是确保正确性的强大工具。 +* 我们可以确保对象的类型不实现 `Send`,确保它在同一个线程,这可以通过 `PhantomData` 标记实现。 +* 每个设计和实施决定都涉及权衡,最好在考虑特定用例的情况下做出。 +* 在没有用例的情况下设计一些东西可能是有趣的和有教育意义的,但是这可能是一个无止境的任务。 +

下一篇,第六章:构建我们自己的“Arc”

diff --git a/README.md b/README.md index fadb3f8..ec6279f 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ ## [第一章:Rust 并发基础](./1_Basic_of_Rust_Concurrency.md) * [Rust 中的线程](./1_Basic_of_Rust_Concurrency.md#rust-中的线程) -* [线程作用域](./1_Basic_of_Rust_Concurrency.md#线程作用域) +* [作用域内的线程](./1_Basic_of_Rust_Concurrency.md#作用域内的线程) * [共享所有权以及引用计数](./1_Basic_of_Rust_Concurrency.md#共享所有权以及引用计数) * [Static](./1_Basic_of_Rust_Concurrency.md#static) * [泄漏(Leak)](./1_Basic_of_Rust_Concurrency.md#泄漏leak)