Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run work in different thread #8

Merged
merged 1 commit into from
Feb 6, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions message_queue/adapters/amqp_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

"""
import pika
import functools
import threading

from message_queue import logger
from message_queue.adapters import BaseAdapter
Expand All @@ -24,6 +26,7 @@ def __init__(self, host='localhost', port=5672, user='guest', password='guest',
:param string vhost: Server virutal host

"""
self.threads = []
self.queue = None
self._host = host
self._credentials = pika.PlainCredentials(user, password)
Expand Down Expand Up @@ -143,38 +146,49 @@ def consume(self, worker):
:param function worker: Method that consume the message

"""
callback = self.consume_callback(worker)
callback = functools.partial(self.consume_callback, worker=worker)
self.channel.basic_consume(callback, self.queue)

try:
self.channel.start_consuming()

except KeyboardInterrupt:
self.channel.stop_consuming()
self.close()

def consume_callback(self, worker):
"""Decorate worker to exectue on consume callback.
for thread in self.threads:
thread.join()

:param function worker: Worker to execture in the consume callback
self.close()

"""
def callback(channel, method, properties, body):
"""Message consume callback.
def consume_callback(self, channel, method, properties, body, worker):
"""Create a new thred.

:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver method: basic_deliver method
:param pika.Spec.BasicProperties properties: properties
:param str|unicode body: The message body
:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver method: basic_deliver method
:param pika.Spec.BasicProperties properties: properties
:param str|unicode body: The message body
:param function worker: Worker to execture in the consume callback
"""
thread = threading.Thread(target=self.do_work, args=(channel, method, properties, body, worker))
thread.start()
self.threads.append(thread)

"""
# Execute the worker
acknowledge = worker(channel, method, properties, body)
def do_work(self, channel, method, properties, body, worker):
"""Execute worker

# Acknowledge the message or not
self._consume_acknowledge(channel, method.delivery_tag, acknowledge)
:param pika.channel.Channel channel: The channel object
:param pika.Spec.Basic.Deliver method: basic_deliver method
:param pika.Spec.BasicProperties properties: properties
:param str|unicode body: The message body
:param function worker: Worker to execture in the consume callback
"""
thread_id = threading.current_thread().ident
tag = method.delivery_tag
LOGGER.debug('Thread id: %r Delivery tag: %r Message body: %r', thread_id, tag, body)

return callback
acknowledge = worker(channel, method, properties, body)
callback = functools.partial(self._consume_acknowledge, channel, tag, acknowledge)
self.connection.add_callback_threadsafe(callback)

def _consume_acknowledge(self, channel, tag, acknowledge=True):
"""Message acknowledge.
Expand Down