12
12
from dj_cqrs .registries import ReplicaRegistry
13
13
14
14
15
- logger = logging .getLogger ('django_cqrs.cqrs_consume ' )
15
+ logger = logging .getLogger ('django-cqrs ' )
16
16
17
17
18
18
def consume (** kwargs ):
19
19
import django
20
20
django .setup ()
21
21
22
22
from dj_cqrs .transport import current_transport
23
- current_transport .consume (** kwargs )
23
+ try :
24
+ current_transport .consume (** kwargs )
25
+ except KeyboardInterrupt :
26
+ pass
27
+
28
+
29
+ def _display_path (path ):
30
+ try :
31
+ return f'"{ path .relative_to (Path .cwd ())} "'
32
+ except ValueError : # pragma: no cover
33
+ return f'"{ path } "'
24
34
25
35
26
36
class WorkersManager :
@@ -65,6 +75,10 @@ def run(self):
65
75
if self .reload :
66
76
for files_changed in self :
67
77
if files_changed :
78
+ logger .warning (
79
+ 'Detected changes in %s. Reloading...' ,
80
+ ', ' .join (map (_display_path , files_changed )),
81
+ )
68
82
self .restart ()
69
83
else :
70
84
self .stop_event .wait ()
@@ -80,11 +94,13 @@ def start(self):
80
94
self .consume_kwargs ,
81
95
)
82
96
self .pool .append (process )
97
+ logger .info (f'Consumer process with pid { process .pid } started' )
83
98
84
99
def terminate (self , * args , ** kwargs ):
85
100
while self .pool :
86
101
process = self .pool .pop ()
87
102
process .stop (sigint_timeout = self .sigint_timeout , sigkill_timeout = self .sigkill_timeout )
103
+ logger .info (f'Consumer process with pid { process .pid } stopped.' )
88
104
89
105
def restart (self , * args , ** kwargs ):
90
106
self .terminate ()
0 commit comments