多线程并发编程-Rust

并发和并行

咖啡机就是我们计算机核心,只有一个核心的情况下还想同时处理多任务只能是两个队伍共用一个核心

image-20250429090255514

  • 并发(Concurrent) 是多个队列使用同一个咖啡机,然后两个队列轮换着使用(未必是 1:1 轮换,也可能是其它轮换规则),最终每个人都能接到咖啡,快速轮换处理不同的任务,线程负责管理任务队列
  • 并行(Parallel) 是每个队列都拥有一个咖啡机,最终也是每个人都能接到咖啡,但是效率更高,因为同时可以有两个人在接咖啡
  • 串行 只有一个队列且仅使用一台咖啡机,前面哪个人接咖啡时突然发呆了几分钟,后面的人就只能等他结束才能继续接。

并发和并行都是对“多任务”处理的描述,其中并发是轮流处理,而并行是同时处理

使用多线程

多线程编程的风险

由于多线程的代码是同时运行的,因此我们无法保证线程间的执行顺序,这会导致一些问题:

  • 竞态条件(race conditions),多个线程以非一致性的顺序同时访问数据资源
  • 死锁(deadlocks),两个线程都想使用某个资源,但是又都在等待对方释放资源后才能使用,结果最终都无法继续执行
  • 一些因为多线程导致的很隐晦的 BUG,难以复现和解决
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use std::thread;
use std::time::Duration;

fn main() {
thread::spawn(|| {//线程并未执行完毕,main函数结束线程就会自动结束
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}

image-20250429092654350

等待线程结束

调用 handle.join,可以让当前线程阻塞,直到它等待的子线程的结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use std::thread;
use std::time::Duration;

fn main() {
let handle = thread::spawn(|| {//线程并未执行完毕,main函数结束线程就会自动结束
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}

在线程闭包中使用 move

线程的启动时间点和结束时间点是不确定的

1
2
3
4
5
6
7
8
9
10
11
use std::thread;

fn main() {
let v = vec![1, 2, 3];

let handle = thread::spawn(|| {//闭包捕获主线程的值。但是没有获取所有权
println!("Here's a vector: {:?}", v);//编译器没有办法判断主线程是否会提前结束,所以不会通过
});//v 被释放掉时,新的线程很可能还没有结束甚至还没有被创建成功,此时新线程对 v 的引用立刻就不再合法

handle.join().unwrap();
}

修改后的代码

1
2
3
4
5
6
7
8
9
10
11
use std::thread;

fn main() {
let v = vec![1, 2, 3];

let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});

handle.join().unwrap();
}

线程同步

消息传递

channel,发送端(transmitter)和接收端(receiver),任意一端被丢弃通道都会直接关闭

单发送者,单接受者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::sync::mpsc;
use std::thread;

fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();

// 创建线程,并发送消息
thread::spawn(move || {
// 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(1).unwrap();

// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
});

// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}
  • tx,rx对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)发送了整数,因此它们分别是mpsc::Sender<i32>mpsc::Receiver<i32>类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值, 例如此例中非i32类型的值将导致编译错误
  • 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
  • 需要使用movetx的所有权转移到子线程的闭包中

rx.recv()阻塞线程,直到接收到一个值,返回result,当所有发送端关闭,返回一个错误,适用于主线程没有额外工作的情况

rx.try_recv()不阻塞线程,直接返回result,有消息返回ok包含的内容,没有消息直接err

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got: {}", received);
}
}

多发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});

thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
});

for received in rx {
println!("Got: {}", received);
}
}

共享内存

多个线程访问共享数据

mutex互斥锁

使用之前尝试获取lock,使用之后解锁lock以便其它线程可以使用

单线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);

let mut num = m.lock().unwrap();
*num = 6;
// 锁还没有被 drop 就尝试申请下一个锁,导致主线程阻塞
// drop(num); // 手动 drop num ,可以让 num1 申请到下个锁
let mut num1 = m.lock().unwrap();
*num1 = 7;
// drop(num1); // 手动 drop num1 ,观察打印结果的不同

println!("m = {:?}", m);
}

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
// 通过`Rc`实现`Mutex`的多所有权
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Rc::clone(&counter);
// 创建子线程,并将`Mutex`的所有权拷贝传入到子线程中
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();

*num += 1;
});
handles.push(handle);
}

// 等待所有子线程完成
for handle in handles {
handle.join().unwrap();
}

// 输出最终的计数结果
println!("Result: {}", *counter.lock().unwrap());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();

*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());
}

Rc<T>/RefCell<T>用于单线程内部可变性, Arc<T>/Mutex<T>用于多线程内部可变性