Skip to content

Commit

Permalink
queue example
Browse files Browse the repository at this point in the history
  • Loading branch information
bjudkewitz committed Sep 13, 2024
1 parent e3638ec commit 987f6cb
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,19 @@ from multiprocessing import Process
from dejaq import DejaQueue

def produce(queue):
for i in range(20):
random_shape = np.random.randint(5,10, size=3)
array = np.random.randn(*random_shape)
queue.put(array, meta=i)
print(f'produced {type(array)} {array.shape} {array.dtype}; meta: {i}; hash: {hash(array.tobytes())}\n')
for i in range(10):
arr = np.random.randn(100,200,300)
data = dict(array=arr, i=i)
queue.put(data)
print(f'produced {type(arr)} {arr.shape} {arr.dtype}; meta: {i}; hash: {hash(arr.tobytes())}\n', flush=True)

def consume(queue, pid):
while True:
array, meta = queue.get()
print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; meta: {meta}; hash: {hash(array.tobytes())}\n')
data = queue.get()
array, i = data['array'], data['i']
print(f'consumer {pid} consumed {type(array)} {array.shape} {array.dtype}; index: {i}; hash: {hash(array.tobytes())}\n', flush=True)

queue = DejaQueue(buffer_bytes=10e6)
queue = DejaQueue(buffer_bytes=100e6)
producer = Process(target=produce, args=(queue,))
consumers = [Process(target=consume, args=(queue, pid)) for pid in range(3)]
for c in consumers:
Expand Down

0 comments on commit 987f6cb

Please sign in to comment.