Skip to content

Commit

Permalink
feat: support main process consumption when prefetch_count == 1
Browse files Browse the repository at this point in the history
  • Loading branch information
mcPear committed Dec 21, 2023
1 parent 99198f5 commit 570fa30
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 34 deletions.
80 changes: 48 additions & 32 deletions pikachu/basic_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pikachu.cuda_utils import is_unknown_cuda_error
from pikachu.client import AMQPClient
import traceback
from pebble import ProcessPool
from pebble import ProcessPool, ProcessFuture
import functools

DEFAULT_BROKER_TIMEOUT = (
Expand Down Expand Up @@ -41,24 +41,29 @@ def handle_result(
consumed_message,
request_id_name,
logger,
future,
result,
):
method, properties, message_txt = consumed_message
delivery_tag = method.delivery_tag
message_json = loads(message_txt)
request_id = message_json[request_id_name]
try:
result_dict = future.result()
if isinstance(result, ProcessFuture):
result = future.result()
logger.info(f"[*] Done request id: {request_id}.")
result_dict.update({request_id_name: request_id})
client.publish_and_ack(delivery_tag, properties, dumps(result_dict))
result.update({request_id_name: request_id})
client.publish_and_ack(delivery_tag, properties, dumps(result))
except Exception as e:
if is_unknown_cuda_error(e):
# if something is wrong with CUDA, further consuming is pointless
#
# when prefetch_count == 1 and there's no pool of processes, we pass the error from here to kill the app
#
# when prefetch_count > 1 and we have a pool of processes
# we find this exception from consumer process and raise to restart the app
# we can't raise it from here and restart the app by design:
# we can't raise it from here (it will be ignored) and restart the app by design:
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.add_done_callback
pass
raise e
elif not method.redelivered:
logger.error(
f"[*] Failed {request_id_name}: {request_id}. Redelivering.",
Expand All @@ -85,35 +90,46 @@ def start(
timeout=DEFAULT_BROKER_TIMEOUT,
):
client = AMQPClient.from_config()
mp_context = get_multiprocessing_context()

with ProcessPool(
client.get_prefetch_count(),
context=mp_context,
) as pool:
for message in client.consume(MAINTENANCE_TIMEOUT):
propagate_callback_cuda_exceptions(pool)

if message == MAINTENANCE_MESSAGE:
continue
prefetch_count = client.get_prefetch_count()

if prefetch_count == 1:
for message in client.consume(None):
method, properties, message_txt = message
message_json = loads(message_txt)
request_id = message_json[request_id_name]
logger.info(f"[*] Received {request_id_name}: {request_id}.")

future = pool.schedule(
message_function,
(message_json, models),
timeout=timeout,
)
done_callback = functools.partial(
handle_result,
client,
message,
request_id_name,
logger,
)
future.add_done_callback(done_callback)
result = message_function(message_json, models)
handle_result(client, message, request_id_name, logger, result)

elif prefetch_count > 1:
mp_context = get_multiprocessing_context()
with ProcessPool(
prefetch_count,
context=mp_context,
) as pool:
for message in client.consume(MAINTENANCE_TIMEOUT):
propagate_callback_cuda_exceptions(pool)

if message == MAINTENANCE_MESSAGE:
continue

method, properties, message_txt = message
message_json = loads(message_txt)
request_id = message_json[request_id_name]
logger.info(f"[*] Received {request_id_name}: {request_id}.")

future = pool.schedule(
message_function,
(message_json, models),
timeout=timeout,
)
done_callback = functools.partial(
handle_result,
client,
message,
request_id_name,
logger,
)
future.add_done_callback(done_callback)

client.teardown()
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

setuptools.setup(
name="pikachu",
version="1.2.1",
version="1.3",
author="Maciej Gruszczyński",
author_email="maciejgruszczysnki@surferseo.com",
author_email="maciejgruszczynski@surferseo.com",
description="Wrapper around pika inspired by lapin for convenient AMQP operations in Python APIs",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 570fa30

Please sign in to comment.