-
Notifications
You must be signed in to change notification settings - Fork 0
/
10-queue.py
49 lines (39 loc) · 1.31 KB
/
10-queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import random
import concurrent.futures
import time
import threading
import queue
#Producer Consumer pipeline refactored using Queue
class Pipeline(queue.Queue):
def __init__(self):
super().__init__(maxsize=20)
def put_message(self, message):
print(f'producing message: {message}')
producer_pipeline.append(message)
self.put(message)
def get_message(self):
message = self.get()
print(f'consuming message: {message}')
consumer_pipeline.append(message)
return message
def producer(pipeline, event):
while not event.is_set():
message = random.randint(1, 100)
pipeline.put_message(message)
def consumer(pipeline, event):
while not pipeline.empty() or not event.is_set():
print(f'queue size is {pipeline.qsize()}')
message = pipeline.get_message()
time.sleep(random.random())
producer_pipeline = []
consumer_pipeline = []
if __name__ == '__main__':
pipeline = Pipeline()
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
ex.submit(producer, pipeline, event)
ex.submit(consumer, pipeline, event)
time.sleep(0.5)
event.set()
print(f'producer pipeline: {producer_pipeline}')
print(f'consumer pipeline: {consumer_pipeline}')