Skip to content

Commit

Permalink
pre-commit and remove RmqCoordinator to tests/util only
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Jan 10, 2025
1 parent c822953 commit 42738fc
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 98 deletions.
2 changes: 1 addition & 1 deletion src/plumpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from .process_listener import *
from .process_states import *
from .processes import *
from .rmq import *
from .utils import *
from .workchains import *
from .rmq import *

__all__ = (
events.__all__
Expand Down
4 changes: 1 addition & 3 deletions src/plumpy/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,4 @@ def create_task(coro: Callable[[], Awaitable[Any]], loop: Optional[asyncio.Abstr
# asyncio.run_coroutine_threadsafe(run_task(), loop)
# return future

return asyncio.wrap_future(
asyncio.run_coroutine_threadsafe(coro(), loop)
)
return asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro(), loop))
2 changes: 0 additions & 2 deletions src/plumpy/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from plumpy.coordinator import Coordinator
from plumpy.exceptions import PersistenceError, TaskRejectedError

from plumpy.exceptions import PersistenceError, TaskRejectedError

from . import loaders, persistence
from .utils import PID_TYPE

Expand Down
2 changes: 2 additions & 0 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
"""The main Process module"""

from __future__ import annotations

import abc
import asyncio
import concurrent.futures
Expand Down
6 changes: 3 additions & 3 deletions src/plumpy/rmq/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> k

return converted


T = TypeVar('T', bound=kiwipy.Communicator)

def wrap_communicator(
communicator: T, loop: Optional[asyncio.AbstractEventLoop] = None
) -> 'LoopCommunicator[T]':

def wrap_communicator(communicator: T, loop: Optional[asyncio.AbstractEventLoop] = None) -> 'LoopCommunicator[T]':
"""
Wrap a communicator such that all callbacks made to any subscribers are scheduled on the
given event loop.
Expand Down
85 changes: 0 additions & 85 deletions src/plumpy/rmq/coordinator.py

This file was deleted.

3 changes: 2 additions & 1 deletion src/plumpy/rmq/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import kiwipy

__all__ = ['wrap_to_concurrent_future', 'unwrap_kiwi_future']
__all__ = ['unwrap_kiwi_future', 'wrap_to_concurrent_future']


def _convert_future_exc(exc):
Expand Down Expand Up @@ -112,6 +112,7 @@ def wrap_to_concurrent_future(future: asyncio.Future[Any]) -> kiwipy.Future:
_chain_future(future, new_future)
return new_future


# XXX: this required in aiida-core, see if really need this unwrap.
def unwrap_kiwi_future(future: kiwipy.Future) -> kiwipy.Future:
"""
Expand Down
25 changes: 22 additions & 3 deletions tests/rmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
# -*- coding: utf-8 -*-
from typing import Generic, TypeVar, final
import kiwipy
import concurrent.futures

from plumpy.exceptions import CoordinatorConnectionError


class RmqCoordinator:
def __init__(self, comm: kiwipy.Communicator):
U = TypeVar('U', bound=kiwipy.Communicator)

@final
class RmqCoordinator(Generic[U]):
def __init__(self, comm: U):
self._comm = comm

@property
def communicator(self) -> U:
"""The inner communicator."""
return self._comm

# XXX: naming - `add_receiver_rpc`
def add_rpc_subscriber(self, subscriber, identifier=None):
return self._comm.add_rpc_subscriber(subscriber, identifier)
Expand All @@ -18,9 +27,19 @@ def add_broadcast_subscriber(
self,
subscriber,
subject_filters=None,
sender_filters=None,
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filters)
subscriber = kiwipy.BroadcastFilter(subscriber)

subject_filters = subject_filters or []
sender_filters = sender_filters or []

for filter in subject_filters:
subscriber.add_subject_filter(filter)
for filter in sender_filters:
subscriber.add_sender_filter(filter)

return self._comm.add_broadcast_subscriber(subscriber, identifier)

# XXX: naming - `add_reciver_task` (can be combined with two above maybe??)
Expand Down

0 comments on commit 42738fc

Please sign in to comment.