@@ -30,8 +30,9 @@ class RabbitMQTransport(LoggingMixin, BaseTransport):
30
30
def clean_connection (cls ):
31
31
if cls ._producer_connection and not cls ._producer_connection .is_closed :
32
32
cls ._producer_connection .close ()
33
- cls ._producer_connection = None
34
- cls ._producer_channel = None
33
+
34
+ cls ._producer_connection = None
35
+ cls ._producer_channel = None
35
36
36
37
@classmethod
37
38
def consume (cls ):
@@ -57,24 +58,34 @@ def consume(cls):
57
58
58
59
@classmethod
59
60
def produce (cls , payload ):
60
- # TODO: try to produce and reconnect several times, now leave as before
61
- # if cannot publish message - drop it and try to reconnect on next event
62
- rmq_settings = cls ._get_common_settings ()
63
- exchange = rmq_settings [- 1 ]
64
-
65
61
try :
66
- # Decided not to create context-manager to stay within the class
67
- _ , channel = cls ._get_producer_rmq_objects (* rmq_settings )
68
-
69
- cls ._produce_message (channel , exchange , payload )
70
- cls .log_produced (payload )
62
+ cls ._produce (payload )
71
63
except (exceptions .AMQPError , exceptions .ChannelError , exceptions .ReentrancyError ):
72
- logger .error ("CQRS couldn't be published: pk = {} ({})." .format (
64
+ logger .error ("CQRS couldn't be published: pk = {} ({}). Reconnect... " .format (
73
65
payload .pk , payload .cqrs_id ,
74
66
))
75
67
76
68
# in case of any error - close connection and try to reconnect
77
69
cls .clean_connection ()
70
+ # reconnect at least 1 time
71
+ try :
72
+ cls ._produce (payload )
73
+ except (exceptions .AMQPError , exceptions .ChannelError , exceptions .ReentrancyError ):
74
+ logger .error ("CQRS couldn't be published: pk = {} ({})." .format (
75
+ payload .pk , payload .cqrs_id ,
76
+ ))
77
+
78
+ cls .clean_connection ()
79
+
80
+ @classmethod
81
+ def _produce (cls , payload ):
82
+ rmq_settings = cls ._get_common_settings ()
83
+ exchange = rmq_settings [- 1 ]
84
+ # Decided not to create context-manager to stay within the class
85
+ _ , channel = cls ._get_producer_rmq_objects (* rmq_settings )
86
+
87
+ cls ._produce_message (channel , exchange , payload )
88
+ cls .log_produced (payload )
78
89
79
90
@classmethod
80
91
def _consume_message (cls , ch , method , properties , body ):
0 commit comments