Skip to content

Commit ec5b188

Browse files
committed
add try/except
1 parent 47b2f39 commit ec5b188

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

rabbitmq/rabbitmq.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ def close(self):
4242
finally:
4343
self.is_open = False
4444

45+
def __repr__(self):
46+
return self.amqp_url
47+
4548

4649
class MQProducer(RabbitMQ):
4750

@@ -58,16 +61,19 @@ def publish(self, exchange: str, routing_key: str, body: str,
5861

5962
class MQConsumer(RabbitMQ):
6063

61-
def open(self):
64+
def open(self, prefetch: int = 200):
65+
"""
66+
:param prefetch: prefetch count of message
67+
"""
6268
super(MQConsumer, self).open()
63-
self.channel.basic_qos(prefetch_count=1)
69+
self.channel.basic_qos(prefetch_count=prefetch)
6470

6571
def consume(self, queue: str):
6672
for method_frame, properties, body in self.channel.consume(queue):
6773
return method_frame, properties, body.decode('utf-8')
6874

6975
def basic_consume(self, queue: str, on_message_callback: typing.Callable):
70-
self.channel.basic_consume(on_message_callback, queue=queue)
76+
self.channel.basic_consume(queue, on_message_callback)
7177

7278
def start_consuming(self):
7379
self.channel.start_consuming()

rabbitmq/thread_comsume.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@ def __init__(self, conf: dict):
3535
# self.mq_consumer.open()
3636

3737
def run(self):
38-
self.mq_consumer.open()
39-
queue_name = f"test-{self.exchange}"
40-
self.mq_consumer.bind(self.exchange, self.routing_key, queue_name)
41-
self.mq_consumer.basic_consume(queue_name, self.callback_rabbit)
42-
logger.warning(
43-
f"binding from {self.exchange}, routing_key={self.routing_key}, queue={queue_name}")
44-
self.mq_consumer.start_consuming()
38+
try:
39+
self.mq_consumer.open()
40+
queue_name = f"test-{self.exchange}"
41+
self.mq_consumer.bind(self.exchange, self.routing_key, queue_name)
42+
self.mq_consumer.basic_consume(queue_name, self.callback_rabbit)
43+
logger.warning(
44+
f"binding from {self.exchange}, routing_key={self.routing_key}, queue={queue_name}")
45+
self.mq_consumer.start_consuming()
46+
except KeyboardInterrupt:
47+
pass
48+
finally:
49+
self.mq_consumer.close()
4550

4651
@classmethod
4752
def callback_rabbit(cls, ch, method, properties, body):

0 commit comments

Comments
 (0)