线程同步:锁、Condvar 和信号量
在多线程编程中,同步性极其的重要,当你需要同时访问一个资源、控制不同线程的执行次序时,都需要使用到同步性。
在 Rust 中有多种方式可以实现同步性。在上一节中讲到的消息传递就是同步性的一种实现方式,例如我们可以通过消息传递来控制不同线程间的执行次序。还可以使用共享内存来实现同步性,例如通过锁和原子操作等并发原语来实现多个线程同时且安全地去访问一个资源。
该如何选择
共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:
- 共享内存相对消息传递能节省多次内存拷贝的成本
- 共享内存的实现简洁的多
- 共享内存的锁竞争更多
消息传递适用的场景很多,我们下面列出了几个主要的使用场景:
- 需要可靠和简单的(简单不等于简洁)实现时
- 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
- 需要一个任务处理流水线(管道)时,等等
而使用共享内存(并发原语)的场景往往就比较简单粗暴:需要简洁的实现以及更高的性能时。
总之,消息传递类似一个单所有权的系统:一个值同时只能有一个所有者,如果另一个线程需要该值的所有权,需要将所有权通过消息传递进行转移。而共享内存类似于一个多所有权的系统:多个线程可以同时访问同一个值。
互斥锁 Mutex
既然是共享内存,那并发原语自然是重中之重,先来一起看看皇冠上的明珠: 互斥锁Mutex
(mutual exclusion 的缩写)。
Mutex
让多个线程并发的访问同一个值变成了排队访问:同一时间,只允许一个线程A
访问该值,其它线程需要等待A
访问完成后才能继续。
单线程中使用 Mutex
先来看看单线程中Mutex
该如何使用:
use std::sync::Mutex; fn main() { // 使用`Mutex`结构体的关联函数创建新的互斥锁实例 let m = Mutex::new(5); { // 获取锁,然后deref为`m`的引用 // lock返回的是Result let mut num = m.lock().unwrap(); *num = 6; // 锁自动被drop } println!("m = {:?}", m); }
在注释中,已经大致描述了代码的功能,不过有一点需要注意:和Box
类似,数据被Mutex
所拥有,要访问内部的数据,需要使用方法m.lock()
向m
申请一个锁, 该方法会阻塞当前线程,直到获取到锁,因此当多个线程同时访问该数据时,只有一个线程能获取到锁,其它线程只能阻塞着等待,这样就保证了数据能被安全的修改!
m.lock()
方法也有可能报错,例如当前正在持有锁的线程panic
了。在这种情况下,其它线程不可能再获得锁,因此lock
方法会返回一个错误。
这里你可能奇怪,m.lock
明明返回一个锁,怎么就变成我们的num
数值了?聪明的读者可能会想到智能指针,没错,因为Mutex<T>
是一个智能指针,准确的说是m.lock()
返回一个智能指针MutexGuard<T>
:
- 它实现了
Deref
特征,会被自动解引用后获得一个引用类型,该引用指向Mutex
内部的数据 - 它还实现了
Drop
特征,在超出作用域后,自动释放锁,以便其它线程能继续获取锁
正因为智能指针的使用,使得我们无需任何操作就能获取其中的数据。 如果释放锁,你需要做的仅仅是做好锁的作用域管理,例如上述代码的内部花括号使用,建议读者尝试下去掉内部的花括号,然后再次尝试获取第二个锁num1
,看看会发生什么,友情提示:不会报错,但是主线程会永远阻塞,因为不幸发生了死锁。
use std::sync::Mutex; fn main() { let m = Mutex::new(5); let mut num = m.lock().unwrap(); *num = 6; // 锁还没有被 drop 就尝试申请下一个锁,导致主线程阻塞 // drop(num); // 手动 drop num ,可以让 num1 申请到下个锁 let mut num1 = m.lock().unwrap(); *num1 = 7; // drop(num1); // 手动 drop num1 ,观察打印结果的不同 println!("m = {:?}", m); }
多线程中使用 Mutex
单线程中使用锁,说实话纯粹是为了演示功能,毕竟多线程才是锁的舞台。 现在,我们再来看看,如何在多线程下使用Mutex
来访问同一个资源.
无法运行的Rc<T>
use std::rc::Rc; use std::sync::Mutex; use std::thread; fn main() { // 通过`Rc`实现`Mutex`的多所有权 let counter = Rc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Rc::clone(&counter); // 创建子线程,并将`Mutex`的所有权拷贝传入到子线程中 let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } // 等待所有子线程完成 for handle in handles { handle.join().unwrap(); } // 输出最终的计数结果 println!("Result: {}", *counter.lock().unwrap()); }
由于子线程需要通过move
拿走锁的所有权,因此我们需要使用多所有权来保证每个线程都拿到数据的独立所有权,恰好智能指针Rc<T>
可以做到(上面代码会报错!具体往下看,别跳过-, -)。
以上代码实现了在多线程中计数的功能,由于多个线程都需要去修改该计数器,因此我们需要使用锁来保证同一时间只有一个线程可以修改计数器,否则会导致脏数据:想象一下 A 线程和 B 线程同时拿到计数器,获取了当前值1
, 并且同时对其进行了修改,最后值变成2
,你会不会在风中凌乱?毕竟正确的值是3
,因为两个线程各自加 1。
可能有人会说,有那么巧的事情吗?事实上,对于人类来说,因为干啥啥慢,并没有那么多巧合,所以人总会存在巧合心理。但是对于计算机而言,每秒可以轻松运行上亿次,在这种频次下,一切巧合几乎都将必然发生,因此千万不要有任何侥幸心理。
如果事情有变坏的可能,不管这种可能性有多小,它都会发生! - 在计算机领域歪打正着的墨菲定律
事实上,上面的代码会报错:
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
// `Rc`无法在线程中安全的传输
--> src/main.rs:11:22
|
13 | let handle = thread::spawn(move || {
| ______________________^^^^^^^^^^^^^_-
| | |
| | `Rc<Mutex<i32>>` cannot be sent between threads safely
14 | | let mut num = counter.lock().unwrap();
15 | |
16 | | *num += 1;
17 | | });
| |_________- within this `[closure@src/main.rs:11:36: 15:10]`
|
= help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
// `Rc`没有实现`Send`特征
= note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]`
错误中提到了一个关键点:Rc<T>
无法在线程中传输,因为它没有实现Send
特征(在下一节将详细介绍),而该特征可以确保数据在线程中安全的传输。
多线程安全的 Arc<T>
好在,我们有Arc<T>
,得益于它的内部计数器是多线程安全的,因此可以在多线程环境中使用:
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
以上代码可以顺利运行:
Result: 10
内部可变性
在之前章节,我们提到过内部可变性,其中Rc<T>
和RefCell<T>
的结合,可以实现单线程的内部可变性。
现在我们又有了新的武器,由于Mutex<T>
可以支持修改内部数据,当结合Arc<T>
一起使用时,可以实现多线程的内部可变性。
简单总结下:Rc<T>/RefCell<T>
用于单线程内部可变性, Arc<T>/Mutex<T>
用于多线程内部可变性。
需要小心使用的 Mutex
如果有其它语言的编程经验,就知道互斥锁这家伙不好对付,想要正确使用,你得牢记在心:
- 在使用数据前必须先获取锁
- 在数据使用完成后,必须及时的释放锁,比如文章开头的例子,使用内部语句块的目的就是为了及时的释放锁
这两点看起来不起眼,但要正确的使用,其实是相当不简单的,对于其它语言,忘记释放锁是经常发生的,虽然 Rust 通过智能指针的drop
机制帮助我们避免了这一点,但是由于不及时释放锁导致的性能问题也是常见的。
正因为这种困难性,导致很多用户都热衷于使用消息传递的方式来实现同步,例如 Go 语言直接把channel
内置在语言特性中,甚至还有无锁的语言,例如erlang
,完全使用Actor
模型,依赖消息传递来完成共享和同步。幸好 Rust 的类型系统、所有权机制、智能指针等可以很好的帮助我们减轻使用锁时的负担。
另一个值的注意的是在使用Mutex<T>
时,Rust 无法帮我们避免所有的逻辑错误,例如在之前章节,我们提到过使用Rc<T>
可能会导致循环引用的问题。类似的,Mutex<T>
也存在使用上的风险,例如创建死锁(deadlock):当一个操作试图锁住两个资源,然后两个线程各自获取其中一个锁,并试图获取另一个锁时,就会造成死锁。
死锁
在 Rust 中有多种方式可以创建死锁,了解这些方式有助于你提前规避可能的风险,一起来看看。
单线程死锁
这种死锁比较容易规避,但是当代码复杂后还是有可能遇到:
use std::sync::Mutex; fn main() { let data = Mutex::new(0); let d1 = data.lock(); let d2 = data.lock(); } // d1锁在此处释放
非常简单,只要你在另一个锁还未被释放时去申请新的锁,就会触发,当代码复杂后,这种情况可能就没有那么显眼。
多线程死锁
当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁时,就可能发生死锁:
use std::{sync::{Mutex, MutexGuard}, thread}; use std::thread::sleep; use std::time::Duration; use lazy_static::lazy_static; lazy_static! { static ref MUTEX1: Mutex<i64> = Mutex::new(0); static ref MUTEX2: Mutex<i64> = Mutex::new(0); } fn main() { // 存放子线程的句柄 let mut children = vec![]; for i_thread in 0..2 { children.push(thread::spawn(move || { for _ in 0..1 { // 线程1 if i_thread % 2 == 0 { // 锁住MUTEX1 let guard: MutexGuard<i64> = MUTEX1.lock().unwrap(); println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread); // 当前线程睡眠一小会儿,等待线程2锁住MUTEX2 sleep(Duration::from_millis(10)); // 去锁MUTEX2 let guard = MUTEX2.lock().unwrap(); // 线程2 } else { // 锁住MUTEX2 let _guard = MUTEX2.lock().unwrap(); println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread); let _guard = MUTEX1.lock().unwrap(); } } })); } // 等子线程完成 for child in children { let _ = child.join(); } println!("死锁没有发生"); }
在上面的描述中,我们用了"可能"二字,原因在于死锁在这段代码中不是必然发生的,总有一次运行你能看到最后一行打印输出。这是由于子线程的初始化顺序和执行速度并不确定,我们无法确定哪个线程中的锁先被执行,因此也无法确定两个线程对锁的具体使用顺序。
但是,可以简单的说明下死锁发生的必然条件:线程 1 锁住了MUTEX1
并且线程2
锁住了MUTEX2
,然后线程 1 试图去访问MUTEX2
,同时线程2
试图去访问MUTEX1
,就会死锁。 因为线程 2 需要等待线程 1 释放MUTEX1
后,才会释放MUTEX2
,而与此同时,线程 1 需要等待线程 2 释放MUTEX2
后才能释放MUTEX1
,这种情况造成了两个线程都无法释放对方需要的锁,最终死锁。
那么为何某些时候,死锁不会发生?原因很简单,线程 2 在线程 1 锁MUTEX1
之前,就已经全部执行完了,随之线程 2 的MUTEX2
和MUTEX1
被全部释放,线程 1 对锁的获取将不再有竞争者。 同理,线程 1 若全部被执行完,那线程 2 也不会被锁,因此我们在线程 1 中间加一个睡眠,增加死锁发生的概率。如果你在线程 2 中同样的位置也增加一个睡眠,那死锁将必然发生!
try_lock
与lock
方法不同,try_lock
会尝试去获取一次锁,如果无法获取会返回一个错误,因此不会发生阻塞:
use std::{sync::{Mutex, MutexGuard}, thread}; use std::thread::sleep; use std::time::Duration; use lazy_static::lazy_static; lazy_static! { static ref MUTEX1: Mutex<i64> = Mutex::new(0); static ref MUTEX2: Mutex<i64> = Mutex::new(0); } fn main() { // 存放子线程的句柄 let mut children = vec![]; for i_thread in 0..2 { children.push(thread::spawn(move || { for _ in 0..1 { // 线程1 if i_thread % 2 == 0 { // 锁住MUTEX1 let guard: MutexGuard<i64> = MUTEX1.lock().unwrap(); println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread); // 当前线程睡眠一小会儿,等待线程2锁住MUTEX2 sleep(Duration::from_millis(10)); // 去锁MUTEX2 let guard = MUTEX2.try_lock(); println!("线程 {} 获取 MUTEX2 锁的结果: {:?}", i_thread, guard); // 线程2 } else { // 锁住MUTEX2 let _guard = MUTEX2.lock().unwrap(); println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread); sleep(Duration::from_millis(10)); let guard = MUTEX1.try_lock(); println!("线程 {} 获取 MUTEX1 锁的结果: {:?}", i_thread, guard); } } })); } // 等子线程完成 for child in children { let _ = child.join(); } println!("死锁没有发生"); }
为了演示try_lock
的作用,我们特定使用了之前必定会死锁的代码,并且将lock
替换成try_lock
,与之前的结果不同,这段代码将不会再有死锁发生:
线程 0 锁住了MUTEX1,接着准备去锁MUTEX2 !
线程 1 锁住了MUTEX2, 准备去锁MUTEX1
线程 1 获取 MUTEX1 锁的结果: Err("WouldBlock")
线程 0 获取 MUTEX2 锁的结果: Ok(0)
死锁没有发生
如上所示,当try_lock
失败时,会报出一个错误:Err("WouldBlock")
,接着线程中的剩余代码会继续执行,不会被阻塞。
一个有趣的命名规则:在 Rust 标准库中,使用
try_xxx
都会尝试进行一次操作,如果无法完成,就立即返回,不会发生阻塞。例如消息传递章节中的try_recv
以及本章节中的try_lock
读写锁 RwLock
Mutex
会对每次读写都进行加锁,但某些时候,我们需要大量的并发读,Mutex
就无法满足需求了,此时就可以使用RwLock
:
use std::sync::RwLock; fn main() { let lock = RwLock::new(5); // 同一时间允许多个读 { let r1 = lock.read().unwrap(); let r2 = lock.read().unwrap(); assert_eq!(*r1, 5); assert_eq!(*r2, 5); } // 读锁在此处被drop // 同一时间只允许一个写 { let mut w = lock.write().unwrap(); *w += 1; assert_eq!(*w, 6); // 以下代码会阻塞发生死锁,因为读和写不允许同时存在 // 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中 // let r1 = lock.read(); // println!("{:?}",r1); }// 写锁在此处被drop }
RwLock
在使用上和Mutex
区别不大,只有在多个读的情况下不阻塞程序,其他如读写、写读、写写情况下均会对后获取锁的操作进行阻塞。
我们也可以使用try_write
和try_read
来尝试进行一次写/读,若失败则返回错误:
Err("WouldBlock")
简单总结下RwLock
:
- 同时允许多个读,但最多只能有一个写
- 读和写不能同时存在
- 读可以使用
read
、try_read
,写write
、try_write
, 在实际项目中,try_xxx
会安全的多
Mutex 还是 RwLock
首先简单性上Mutex
完胜,因为使用RwLock
你得操心几个问题:
- 读和写不能同时发生,如果使用
try_xxx
解决,就必须做大量的错误处理和失败重试机制 - 当读多写少时,写操作可能会因为一直无法获得锁导致连续多次失败(writer starvation)
- RwLock 其实是操作系统提供的,实现原理要比
Mutex
复杂的多,因此单就锁的性能而言,比不上原生实现的Mutex
再来简单总结下两者的使用场景:
- 追求高并发读取时,使用
RwLock
,因为Mutex
一次只允许一个线程去读取 - 如果要保证写操作的成功性,使用
Mutex
- 不知道哪个合适,统一使用
Mutex
需要注意的是,RwLock
虽然看上去貌似提供了高并发读取的能力,但这个不能说明它的性能比Mutex
高,事实上Mutex
性能要好不少,后者唯一的问题也仅仅在于不能并发读取。
一个常见的、错误的使用RwLock
的场景就是使用HashMap
进行简单读写,因为HashMap
的读和写都非常快,RwLock
的复杂实现和相对低的性能反而会导致整体性能的降低,因此一般来说更适合使用Mutex
。
总之,如果你要使用RwLock
要确保满足以下两个条件:并发读,且需要对读到的资源进行"长时间"的操作,HashMap
也许满足了并发读的需求,但是往往并不能满足后者:"长时间"的操作。
benchmark 永远是你在迷茫时最好的朋友!
三方库提供的锁实现
标准库在设计时总会存在取舍,因为往往性能并不是最好的,如果你追求性能,可以使用三方库提供的并发原语:
- parking_lot, 功能更完善、稳定,社区较为活跃,star 较多,更新较为活跃
- spin, 在多数场景中性能比
parking_lot
高一点,最近没怎么更新
如果不是追求特别极致的性能,建议选择前者。
用条件变量(Condvar)控制线程的同步
Mutex
用于解决资源安全访问的问题,但是我们还需要一个手段来解决资源访问顺序的问题。而 Rust 考虑到了这一点,为我们提供了条件变量(Condition Variables),它经常和Mutex
一起使用,可以让线程挂起,直到某个条件发生后再继续执行,其实Condvar
我们在之前的多线程章节就已经见到过,现在再来看一个不同的例子:
use std::sync::{Arc,Mutex,Condvar}; use std::thread::{spawn,sleep}; use std::time::Duration; fn main() { let flag = Arc::new(Mutex::new(false)); let cond = Arc::new(Condvar::new()); let cflag = flag.clone(); let ccond = cond.clone(); let hdl = spawn(move || { let mut lock = cflag.lock().unwrap(); let mut counter = 0; while counter < 3 { while !*lock { // wait方法会接收一个MutexGuard<'a, T>,且它会自动地暂时释放这个锁,使其他线程可以拿到锁并进行数据更新。 // 同时当前线程在此处会被阻塞,直到被其他地方notify后,它会将原本的MutexGuard<'a, T>还给我们,即重新获取到了锁,同时唤醒了此线程。 lock = ccond.wait(lock).unwrap(); } *lock = false; counter += 1; println!("inner counter: {}", counter); } }); let mut counter = 0; loop { sleep(Duration::from_millis(1000)); *flag.lock().unwrap() = true; counter += 1; if counter > 3 { break; } println!("outside counter: {}", counter); cond.notify_one(); } hdl.join().unwrap(); println!("{:?}", flag); }
例子中通过主线程来触发子线程实现交替打印输出:
outside counter: 1
inner counter: 1
outside counter: 2
inner counter: 2
outside counter: 3
inner counter: 3
Mutex { data: true, poisoned: false, .. }
信号量 Semaphore
在多线程中,另一个重要的概念就是信号量,使用它可以让我们精准的控制当前正在运行的任务最大数量。想象一下,当一个新游戏刚开服时(有些较火的老游戏也会,比如wow
),往往会控制游戏内玩家的同时在线数,一旦超过某个临界值,就开始进行排队进服。而在实际使用中,也有很多时候,我们需要通过信号量来控制最大并发数,防止服务器资源被撑爆。
本来 Rust 在标准库中有提供一个信号量实现, 但是由于各种原因这个库现在已经不再推荐使用了,因此我们推荐使用tokio
中提供的Semaphore
实现: tokio::sync::Semaphore
。
use std::sync::Arc; use tokio::sync::Semaphore; #[tokio::main] async fn main() { let semaphore = Arc::new(Semaphore::new(3)); let mut join_handles = Vec::new(); for _ in 0..5 { let permit = semaphore.clone().acquire_owned().await.unwrap(); join_handles.push(tokio::spawn(async move { // // 在这里执行任务... // drop(permit); })); } for handle in join_handles { handle.await.unwrap(); } }
上面代码创建了一个容量为 3 的信号量,当正在执行的任务超过 3 时,剩下的任务需要等待正在执行任务完成并减少信号量后到 3 以内时,才能继续执行。
这里的关键其实说白了就在于:信号量的申请和归还,使用前需要申请信号量,如果容量满了,就需要等待;使用后需要释放信号量,以便其它等待者可以继续。
总结
在很多时候,消息传递都是非常好用的手段,它可以让我们的数据在任务流水线上不断流转,实现起来非常优雅。
但是它并不能优雅的解决所有问题,因为我们面临的真实世界是非常复杂的,无法用某一种银弹统一解决。当面临消息传递不太适用的场景时,或者需要更好的性能和简洁性时,我们往往需要用锁来解决这些问题,因为锁允许多个线程同时访问同一个资源,简单粗暴。
除了锁之外,其实还有一种并发原语可以帮助我们解决并发访问数据的问题,那就是原子类型 Atomic,在下一章节中,我们会对其进行深入讲解。