Skip to content

Commit

Permalink
CoordinatorTimeoutError and CoordinatorConnectionError as generic err…
Browse files Browse the repository at this point in the history
…or handler
  • Loading branch information
unkcpz committed Dec 17, 2024
1 parent aebec4f commit 4960836
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
'KilledError',
'PersistenceError',
'UnsuccessfulResult',
'CoordinatorConnectionError',
'CoordinatorTimeoutError',
]


Expand Down Expand Up @@ -42,3 +44,11 @@ class ClosedError(Exception):

class TaskRejectedError(Exception):
"""A task was rejected by the coordinacor"""


class CoordinatorConnectionError(ConnectionError):
"""Raised when coordinator cannot be connected"""


class CoordinatorTimeoutError(TimeoutError):
"""Raised when communicate with coordinator timeout"""
8 changes: 2 additions & 6 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,19 +741,15 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None:
call_with_super_check(self.on_killed)

if self._coordinator and isinstance(self.state, enum.Enum):
# FIXME: this part should be tested first
# FIXME: move all to `coordinator.broadcast()` call and in rmq implement coordinator
from plumpy.rmq.exceptions import CommunicatorChannelInvalidStateError, CommunicatorConnectionClosed

from_label = cast(enum.Enum, from_state.LABEL).value if from_state is not None else None
subject = f'state_changed.{from_label}.{self.state.value}'
self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
try:
self._coordinator.broadcast_send(body=None, sender=self.pid, subject=subject)
except (CommunicatorConnectionClosed, CommunicatorChannelInvalidStateError):
except exceptions.CoordinatorConnectionError:
message = 'Process<%s>: no connection available to broadcast state change from %s to %s'
self.logger.warning(message, self.pid, from_label, self.state.value)
except concurrent.futures.TimeoutError:
except exceptions.CoordinatorTimeoutError:
message = 'Process<%s>: sending broadcast of state change from %s to %s timed out'
self.logger.warning(message, self.pid, from_label, self.state.value)

Expand Down
17 changes: 14 additions & 3 deletions tests/rmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@

# -*- coding: utf-8 -*-
import kiwipy
import concurrent.futures

from plumpy.exceptions import CoordinatorConnectionError, CoordinatorTimeoutError


class RmqCoordinator:
Expand Down Expand Up @@ -40,11 +43,19 @@ def broadcast_send(
subject=None,
correlation_id=None,
):
return self._comm.broadcast_send(body, sender, subject, correlation_id)
from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed

try:
rsp = self._comm.broadcast_send(body, sender, subject, correlation_id)
except (ChannelInvalidStateError, ConnectionClosed) as exc:
raise CoordinatorConnectionError from exc
except concurrent.futures.TimeoutError as exc:
raise CoordinatorTimeoutError from exc
else:
return rsp

def task_send(self, task, no_reply=False):
return self._comm.task_send(task, no_reply)

def close(self):
self._comm.close()

0 comments on commit 4960836

Please sign in to comment.