用Rust多线程计算一定范围内的质数


背景引入

计算质数是学习编程入门的案例,也可能是初学者的噩梦。但是时间长了之后就习惯了。

比如打印1~1000以内的质数,或者范围更大。打印所有的质数、输出质数有多少个

这里我们可以尝试用多线程的方式,将一个大范围分成很多个小范围,然后用多线程的方式来计算。

rust语言也是一个性能非常高的语言,性能和效率仅次于C语言。同时也有很好的并发性能。然而python不适合写cpu密集型的程序,所以这里尝试一下Rust语言。

开始代码

执行以下python代码可知,0~2000的质数有303个

1
2
3
4
def isPrim(n):
return False if n < 2 else all(n % i != 0 for i in range(2, n))
c = sum(1 for num in range(2000) if isPrim(num))
print(c) # 303个!

可以把范围分成两段,0999 和 10001999

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
use std::thread;
fn main() {
println!("开始了");
let h1 = thread::spawn(t1);
let h2 = thread::spawn(t2);
h1.join().unwrap();
h2.join().unwrap();
println!("结束了");
// 选中可以看到打印了303行,结果正确
}

fn t1() {
for i in 0..1000 {
if is_prim(i) {
println!("{}", i);
}
}
}
fn t2() {
for i in 1000..2000 {
if is_prim(i) {
println!("{}", i);
}
}
}
fn is_prim(n: i32) -> bool {
if n < 2 {
return false;
} else {
for i in 2..n {
if n % i == 0 {
return false;
}
}
return true;
}
}

关键是线程函数没法带参数,这太恶心了。导致我分成两个范围代码重复严重。

但是函数可以套函数,这太好了。

先把上面的代码用匿名函数改成这样

join 方法可以使子线程运行结束后再停止运行程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::thread;
fn main() {
println!("开始了");
// let range_len = 10000;
let h1 = thread::spawn(|| {
for i in 0..1000 {
if is_prim(i) {
println!("{}", i);
}
}
});
let h2 = thread::spawn(|| {
for i in 1000..2000 {
if is_prim(i) {
println!("{}", i);
}
}
});
h1.join().unwrap();
h2.join().unwrap();
println!("结束了");
}

然后再把两个函数合并写成一个,但是这里就出问题了,匿名函数里访问了外部的t变量和rangelen变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use std::thread;
fn main() {
println!("开始了");
let range_len = 1000;
for t in 0..2 {
thread::spawn(|| {
for i in t * range_len..(t + 1) * range_len {
if is_prim(i) {
println!("{}", i);
}
}
})
.join()
.unwrap();
}
println!("结束了");
}

引用菜鸟教程的话

在子线程中尝试使用当前函数的资源,这一定是错误的!因为所有权机制禁止这种危险情况的产生,它将破坏所有权机制销毁资源的一定性。我们可以使用闭包的 move 关键字来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use std::thread;
fn main() {
println!("开始了");
let range_len = 1000;
for t in 0..2 {
thread::spawn(move || { // 添加move
for i in t * range_len..(t + 1) * range_len {
if is_prim(i) {
println!("{}", i);
}
}
})
.join()
.unwrap();
}
println!("结束了");
}

这样发现就好了,打印了303行。

接下来就可以加大数据量测试了。

打印了1229行。说明一万以内有1229个质数,经过相同算法逻辑的单线程python代码验证正确。

以下是继续优化了一下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn main() {
// 求 0 ~ limit 范围内有多少质数
let limit = 10000;

// 按照范围均等的分成 thread_num 段,每段单独一个线程
let thread_num = 10;

for t in 0..thread_num {
thread::spawn(move || {
for i in t * (limit / thread_num)..(t + 1) * (limit / thread_num) {
if is_prim(i) {
println!("{}", i);
}
}
})
.join()
.unwrap();
}
println!("结束了!");
}

现在又出现一个问题,打印太多行了,不方便统计,还得整体复制到txt里看看有多少行,应该添加一个cout变量记录有多少个质数,然后把拖慢速度的print去掉。

1
2
3
4
5
6
7
8
9
10
11
12
let mut count = 0;  // 直接声明一个变量
for t in 0..thread_num {
thread::spawn(move || {
for i in t * (limit / thread_num)..(t + 1) * (limit / thread_num) {
if is_prim(i) {
cout += 1; // 直接在这里写+1
}
}
})
.join()
.unwrap();
}

上述方法肯定是不行的,这里应该是涉及到了多个线程对外部变量进行修改的问题,而且还涉及到了闭包和变量的问题。

经过了一番大力的搜索,看案例学习了一下线程通信问题的问题,现在居然又有新的bug产生了,10个管道传送完了所有的数据之后居然还在等待。接收端的for循环没有结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use std::sync::mpsc;
use std::thread;

fn main() {
println!("开始了");
// 求 0 ~ limit 范围内有多少质数
let limit = 2000;

// 按照范围均等的分成 thread_num 段,每段单独一个线程
let thread_num = 10;

let mut count = 0; // 记录质数的数量

// 新建一个管道,管道两端是发送者和接受者
let (tx, rx) = mpsc::channel();

for t in 0..thread_num {
// 克隆管道,这个管道用于传输子线程发现的质数
let find_prim_pipe = mpsc::Sender::clone(&tx);

thread::spawn(move || {
for i in t * (limit / thread_num)..(t + 1) * (limit / thread_num) {
if is_prim(i) {
find_prim_pipe.send(i).unwrap();
}
}
println!("第{}个线程执行结束", t);
})
.join()
.unwrap();
}
println!("for 1");

for _ in rx {
// 这里可能不应该用for循环,会一直等待下去
count += 1;
println!("{} 增加中", count);
// if count == 303 {
// break;
// }
}

println!("for 2");

println!("{} 结束了!", count);
}

fn is_prim(n: i32) -> bool {
if n < 2 {
return false;
} else {
for i in 2..n {
if n % i == 0 {
return false;
}
}
return true;
}
}

问题应该是这样的,我创建了一个管道,然而我克隆了10个管道,我的10个子线程都用完了10个管道之后把这些管道都关闭了,然而还有那个主管道其实一直没有被我用。主管道的接收端就会一直在等待。

摆烂了,先暂时先做成这样。以后再优化。