diff --git a/dj_cqrs/management/commands/cqrs_consume.py b/dj_cqrs/management/commands/cqrs_consume.py index 152aae9..2d2d94c 100644 --- a/dj_cqrs/management/commands/cqrs_consume.py +++ b/dj_cqrs/management/commands/cqrs_consume.py @@ -12,7 +12,7 @@ from dj_cqrs.registries import ReplicaRegistry -logger = logging.getLogger('django_cqrs.cqrs_consume') +logger = logging.getLogger('django-cqrs') def consume(**kwargs): @@ -20,7 +20,17 @@ def consume(**kwargs): django.setup() from dj_cqrs.transport import current_transport - current_transport.consume(**kwargs) + try: + current_transport.consume(**kwargs) + except KeyboardInterrupt: + pass + + +def _display_path(path): + try: + return f'"{path.relative_to(Path.cwd())}"' + except ValueError: # pragma: no cover + return f'"{path}"' class WorkersManager: @@ -65,6 +75,10 @@ def run(self): if self.reload: for files_changed in self: if files_changed: + logger.warning( + 'Detected changes in %s. Reloading...', + ', '.join(map(_display_path, files_changed)), + ) self.restart() else: self.stop_event.wait() @@ -80,11 +94,13 @@ def start(self): self.consume_kwargs, ) self.pool.append(process) + logger.info(f'Consumer process with pid {process.pid} started') def terminate(self, *args, **kwargs): while self.pool: process = self.pool.pop() process.stop(sigint_timeout=self.sigint_timeout, sigkill_timeout=self.sigkill_timeout) + logger.info(f'Consumer process with pid {process.pid} stopped.') def restart(self, *args, **kwargs): self.terminate()