forked from kartikhans/HotCarbon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMapReduceSort.py
42 lines (27 loc) · 1.06 KB
/
MapReduceSort.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import time
import random
from multiprocessing import Pool
import heapq
def generate_random_list(size):
return random.sample(range(size), size)
class MapReduceSort:
def __init__(self, data_size, chunk_size, num_processes):
self.chunk_size = chunk_size
self.data = generate_random_list(data_size)
self.num_processes = num_processes
def mapper(self, chunk):
return sorted(chunk)
def reducer(self, sorted_chunks):
merged_data = heapq.merge(*sorted_chunks)
return list(merged_data)
def map_reducer(self):
chunks = [self.data[i: i + self.chunk_size] for i in range(0, len(self.data), self.chunk_size)]
with Pool(self.num_processes) as pool:
sorted_chunks = pool.map(self.mapper, chunks)
return self.reducer(sorted_chunks)
if __name__ == '__main__':
start_time = time.time()
chunk_size = 100
num_processes = 4
sorted_data = MapReduceSort(data_size=5000000, chunk_size=chunk_size, num_processes=num_processes).map_reducer()
print(time.time() - start_time)