编程语言 多线程并发编程-Rust xiu 2025-04-29 2025-04-29 并发和并行 咖啡机就是我们计算机核心,只有一个核心的情况下还想同时处理多任务只能是两个队伍共用一个核心
并发(Concurrent) 是多个队列使用同一个咖啡机,然后两个队列轮换着使用(未必是 1:1 轮换,也可能是其它轮换规则),最终每个人都能接到咖啡,快速轮换处理不同的任务,线程负责管理任务队列
并行(Parallel) 是每个队列都拥有一个咖啡机,最终也是每个人都能接到咖啡,但是效率更高,因为同时可以有两个人在接咖啡
串行 只有一个队列且仅使用一台咖啡机,前面哪个人接咖啡时突然发呆了几分钟,后面的人就只能等他结束才能继续接。
并发和并行都是对“多任务”处理的描述,其中并发是轮流处理,而并行是同时处理 。
使用多线程 多线程编程的风险 由于多线程的代码是同时运行的,因此我们无法保证线程间的执行顺序,这会导致一些问题:
竞态条件(race conditions),多个线程以非一致性的顺序同时访问数据资源
死锁(deadlocks),两个线程都想使用某个资源,但是又都在等待对方释放资源后才能使用,结果最终都无法继续执行
一些因为多线程导致的很隐晦的 BUG,难以复现和解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 use std::thread;use std::time::Duration;fn main () { thread::spawn (|| { for i in 1 ..10 { println! ("hi number {} from the spawned thread!" , i); thread::sleep (Duration::from_millis (1 )); } }); for i in 1 ..5 { println! ("hi number {} from the main thread!" , i); thread::sleep (Duration::from_millis (1 )); } }
等待线程结束 调用 handle.join
,可以让当前线程阻塞,直到它等待的子线程的结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 use std::thread;use std::time::Duration;fn main () { let handle = thread::spawn (|| { for i in 1 ..10 { println! ("hi number {} from the spawned thread!" , i); thread::sleep (Duration::from_millis (1 )); } }); handle.join ().unwrap (); for i in 1 ..5 { println! ("hi number {} from the main thread!" , i); thread::sleep (Duration::from_millis (1 )); } }
在线程闭包中使用 move 线程的启动时间点和结束时间点是不确定的
1 2 3 4 5 6 7 8 9 10 11 use std::thread;fn main () { let v = vec! [1 , 2 , 3 ]; let handle = thread::spawn (|| { println! ("Here's a vector: {:?}" , v); }); handle.join ().unwrap (); }
修改后的代码
1 2 3 4 5 6 7 8 9 10 11 use std::thread;fn main () { let v = vec! [1 , 2 , 3 ]; let handle = thread::spawn (move || { println! ("Here's a vector: {:?}" , v); }); handle.join ().unwrap (); }
线程同步 消息传递 channel,发送端(transmitter)和接收端(receiver),任意一端被丢弃通道都会直接关闭
单发送者,单接受者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 use std::sync::mpsc;use std::thread;fn main () { let (tx, rx) = mpsc::channel (); thread::spawn (move || { tx.send (1 ).unwrap (); }); println! ("receive {}" , rx.recv ().unwrap ()); }
tx
,rx
对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)
发送了整数,因此它们分别是mpsc::Sender<i32>
和mpsc::Receiver<i32>
类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值, 例如此例中非i32
类型的值将导致编译错误
接收消息的操作rx.recv()
会阻塞当前线程,直到读取到值,或者通道被关闭
需要使用move
将tx
的所有权转移到子线程的闭包中
rx.recv()阻塞线程,直到接收到一个值,返回result,当所有发送端关闭,返回一个错误,适用于主线程没有额外工作的情况
rx.try_recv()不阻塞线程,直接返回result,有消息返回ok包含的内容,没有消息直接err
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 use std::sync::mpsc;use std::thread;use std::time::Duration;fn main () { let (tx, rx) = mpsc::channel (); thread::spawn (move || { let vals = vec! [ String ::from ("hi" ), String ::from ("from" ), String ::from ("the" ), String ::from ("thread" ), ]; for val in vals { tx.send (val).unwrap (); thread::sleep (Duration::from_secs (1 )); } }); for received in rx { println! ("Got: {}" , received); } }
多发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 use std::sync::mpsc;use std::thread;fn main () { let (tx, rx) = mpsc::channel (); let tx1 = tx.clone (); thread::spawn (move || { tx.send (String ::from ("hi from raw tx" )).unwrap (); }); thread::spawn (move || { tx1.send (String ::from ("hi from cloned tx" )).unwrap (); }); for received in rx { println! ("Got: {}" , received); } }
共享内存 多个线程访问共享数据
mutex互斥锁 使用之前尝试获取lock,使用之后解锁lock以便其它线程可以使用
单线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 use std::sync::Mutex;fn main () { let m = Mutex::new (5 ); let mut num = m.lock ().unwrap (); *num = 6 ; let mut num1 = m.lock ().unwrap (); *num1 = 7 ; println! ("m = {:?}" , m); }
多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 use std::rc::Rc;use std::sync::Mutex;use std::thread;fn main () { let counter = Rc::new (Mutex::new (0 )); let mut handles = vec! []; for _ in 0 ..10 { let counter = Rc::clone (&counter); let handle = thread::spawn (move || { let mut num = counter.lock ().unwrap (); *num += 1 ; }); handles.push (handle); } for handle in handles { handle.join ().unwrap (); } println! ("Result: {}" , *counter.lock ().unwrap ()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 use std::sync::{Arc, Mutex};use std::thread;fn main () { let counter = Arc::new (Mutex::new (0 )); let mut handles = vec! []; for _ in 0 ..10 { let counter = Arc::clone (&counter); let handle = thread::spawn (move || { let mut num = counter.lock ().unwrap (); *num += 1 ; }); handles.push (handle); } for handle in handles { handle.join ().unwrap (); } println! ("Result: {}" , *counter.lock ().unwrap ()); }
Rc<T>/RefCell<T>
用于单线程内部可变性, Arc<T>/Mutex<T>
用于多线程内部可变性