-
Notifications
You must be signed in to change notification settings - Fork 7
/
merge_sort.rs
140 lines (123 loc) · 5.22 KB
/
merge_sort.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use concurrent_slice::{Chunk, ConcurrentSlice};
use futures::{stream, stream::StreamExt as _};
use itertools::izip;
use par_stream::prelude::*;
use rand::prelude::*;
use std::time::Instant;
const LEN: usize = 100_000_000;
const MIN_CHUNK_LEN: usize = 2048;
fn main() {
par_stream::rt::block_on_executor(async move {
// fill array with random numbers
let instant = Instant::now();
let array = {
let array = vec![0i32; LEN];
let chunks: Vec<_> = stream::iter(array.concurrent_chunks_by_division(None))
.par_map_unordered(None, |mut chunk| {
move || {
let mut rng = rand::thread_rng();
chunk.iter_mut().for_each(|elem| *elem = rng.gen());
chunk
}
})
.collect()
.await;
let guard = chunks[0].guard();
drop(chunks);
guard.try_unwrap().unwrap()
};
eprintln!("random vec generation:\t{:?}", instant.elapsed());
// benchmark non-concurrent sorting
let mut array_std = array.clone();
let instant = Instant::now();
let array_std = {
array_std.sort();
array_std
};
eprintln!("std sort:\t{:?}", instant.elapsed());
// parallel merge sort
let instant = Instant::now();
let array_concurrent = {
// generate chunks
let chunks0: Vec<_> = array.concurrent_chunks(MIN_CHUNK_LEN).collect();
let mut chunks1: Vec<_> = vec![0; LEN].concurrent_chunks(MIN_CHUNK_LEN).collect();
// sort within chunks
let mut chunks0: Vec<_> = stream::iter(chunks0)
.par_map(None, |mut chunk| {
move || {
chunk.sort();
chunk
}
})
.collect()
.await;
// merge sort
while chunks0.len() > 1 {
let (chunks0_, chunks1_): (Vec<_>, Vec<_>) = stream::iter(izip!(chunks0, chunks1))
.chunks(2)
.par_map(None, |pair| {
move || {
let mut pair = pair.into_iter();
let (lchunk0, mut lchunk1) = pair.next().unwrap();
let (rchunk0, rchunk1) = match pair.next() {
Some(chunk) => chunk, // non-tail case
None => {
// tail case
lchunk1.copy_from_slice(&*lchunk0);
return (lchunk0, lchunk1);
}
};
// source slices
let mut lslice0: &[_] = &*lchunk0;
let mut rslice0: &[_] = &*rchunk0;
// target slice
let mut chunk1 = Chunk::cat(vec![lchunk1, rchunk1]);
let mut slice1 = &mut *chunk1;
// merge sorted source slices into the target slice
loop {
match (lslice0.first(), rslice0.first()) {
(Some(&lval), Some(&rval)) => {
if lval <= rval {
slice1[0] = lval;
lslice0 = &lslice0[1..];
} else {
slice1[0] = rval;
rslice0 = &rslice0[1..];
}
slice1 = &mut slice1[1..];
}
(Some(_), None) => {
slice1.clone_from_slice(lslice0);
break;
}
(None, Some(_)) => {
slice1.clone_from_slice(rslice0);
break;
}
(None, None) => break,
}
}
// merge source chunks
let chunk0 = Chunk::cat(vec![lchunk0, rchunk0]);
(chunk0, chunk1)
}
})
.unzip()
.await;
// swap even and odd chunks
chunks0 = chunks1_;
chunks1 = chunks0_;
}
// merge chunks back to array
let guard = chunks0[0].guard();
drop(chunks0);
guard.try_unwrap().unwrap()
};
eprintln!("merge sort:\t{:?}", instant.elapsed());
// verify results
assert_eq!(
array_std, array_concurrent,
"the outputs of std sort and merge-sort differ"
);
});
}