Skip to content

Commit

Permalink
correct kombu implementation of a fanout queue
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Dec 30, 2015
1 parent 12dfcf8 commit 73fd499
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions socketio/kombu_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pickle
import uuid

try:
import kombu
Expand Down Expand Up @@ -40,15 +41,17 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
'(Run "pip install kombu" in your '
'virtualenv).')
self.kombu = kombu.Connection(url)
self.queue = self.kombu.SimpleQueue(channel)
self.exchange = kombu.Exchange(channel, type='fanout', durable=False)
self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange)
super(KombuManager, self).__init__(channel=channel)

def _publish(self, data):
return self.queue.put(pickle.dumps(data))
with self.kombu.SimpleQueue(self.queue) as queue:
queue.put(pickle.dumps(data))

def _listen(self):
listen_queue = self.kombu.SimpleQueue(self.channel)
while True:
message = listen_queue.get(block=True)
message.ack()
yield message.payload
with self.kombu.SimpleQueue(self.queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
yield message.payload

0 comments on commit 73fd499

Please sign in to comment.