Skip to content

Commit

Permalink
Merge pull request #8 from ingresse/feature/run-work-in-different-thread
Browse files Browse the repository at this point in the history
Run work in different thread
  • Loading branch information
hugofcampos authored Feb 6, 2019
2 parents d92c054 + c454f9b commit 6e12818
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions message_queue/adapters/amqp_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""
import pika
import functools
import threading

from message_queue import logger
from message_queue.adapters import BaseAdapter
Expand All @@ -24,6 +26,7 @@ def __init__(self, host='localhost', port=5672, user='guest', password='guest',
:param string vhost: Server virutal host
"""
self.threads = []
self.queue = None
self._host = host
self._credentials = pika.PlainCredentials(user, password)
Expand Down Expand Up @@ -143,38 +146,49 @@ def consume(self, worker):
:param function worker: Method that consume the message
"""
callback = self.consume_callback(worker)
callback = functools.partial(self.consume_callback, worker=worker)
self.channel.basic_consume(callback, self.queue)

try:
self.channel.start_consuming()

except KeyboardInterrupt:
self.channel.stop_consuming()
self.close()

def consume_callback(self, worker):
"""Decorate worker to exectue on consume callback.
for thread in self.threads:
thread.join()

:param function worker: Worker to execture in the consume callback
self.close()

"""
def callback(channel, method, properties, body):
"""Message consume callback.
def consume_callback(self, channel, method, properties, body, worker):
"""Create a new thred.
:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver method: basic_deliver method
:param pika.Spec.BasicProperties properties: properties
:param str|unicode body: The message body
:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver method: basic_deliver method
:param pika.Spec.BasicProperties properties: properties
:param str|unicode body: The message body
:param function worker: Worker to execture in the consume callback
"""
thread = threading.Thread(target=self.do_work, args=(channel, method, properties, body, worker))
thread.start()
self.threads.append(thread)

"""
# Execute the worker
acknowledge = worker(channel, method, properties, body)
def do_work(self, channel, method, properties, body, worker):
"""Execute worker
# Acknowledge the message or not
self._consume_acknowledge(channel, method.delivery_tag, acknowledge)
:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver method: basic_deliver method
:param pika.Spec.BasicProperties properties: properties
:param str|unicode body: The message body
:param function worker: Worker to execture in the consume callback
"""
thread_id = threading.current_thread().ident
tag = method.delivery_tag
LOGGER.debug('Thread id: %r Delivery tag: %r Message body: %r', thread_id, tag, body)

return callback
acknowledge = worker(channel, method, properties, body)
callback = functools.partial(self._consume_acknowledge, channel, tag, acknowledge)
self.connection.add_callback_threadsafe(callback)

def _consume_acknowledge(self, channel, tag, acknowledge=True):
"""Message acknowledge.
Expand Down

0 comments on commit 6e12818

Please sign in to comment.