From e0a89e02c3582e86ddf25e2e95ef123572907bbf Mon Sep 17 00:00:00 2001 From: Luis Gomez Date: Mon, 7 Jun 2021 16:22:38 +0200 Subject: [PATCH] dummy approach for QueuedOutboundMessage Signed-off-by: Luis Gomez --- aries_cloudagent/core/event_bus.py | 56 ++++++++++++++++++- .../transport/outbound/manager.py | 45 ++++----------- 2 files changed, 65 insertions(+), 36 deletions(-) diff --git a/aries_cloudagent/core/event_bus.py b/aries_cloudagent/core/event_bus.py index 1855b33ab5..43117e6dd2 100644 --- a/aries_cloudagent/core/event_bus.py +++ b/aries_cloudagent/core/event_bus.py @@ -3,11 +3,13 @@ import logging from itertools import chain -from typing import TYPE_CHECKING, Any, Callable, Dict, Pattern, Sequence +from typing import TYPE_CHECKING, Any, Callable, Dict, Pattern, Sequence, Union if TYPE_CHECKING: # To avoid circular import error from .profile import Profile +from ..connections.models.connection_target import ConnectionTarget + LOGGER = logging.getLogger(__name__) @@ -29,6 +31,11 @@ def payload(self): """Return this event's payload.""" return self._payload + @payload.setter + def payload(self, value): + """Set this event's payload.""" + self._payload = value + def __eq__(self, other): """Test equality.""" if not isinstance(other, Event): @@ -40,6 +47,53 @@ def __repr__(self): return "".format(self._topic, self._payload) +class QueuedOutboundMessage(Event): + """Class representing an outbound message for EventBus""" + + STATE_NEW = "new" + STATE_PENDING = "pending" + STATE_ENCODE = "encode" + STATE_DELIVER = "deliver" + STATE_RETRY = "retry" + STATE_DONE = "done" + + @property + def topic(self): + """Return this event's topic.""" + return self._topic + + @topic.setter + def topic(self, value): + """Set this event's Topic.""" + self._topic = f"outbound/message/target/{value}" + + def __init__( + self, + profile: Profile, + message: Any, + target: ConnectionTarget, + transport_id: str, + ): + """Initialize the queued outbound message.""" + self.profile = profile + self.endpoint = target and target.endpoint + self.error: Exception = None + self.message = message + self.payload = None + self.retries = None + self.retry_at: float = None + self.state = self.STATE_NEW + self.target = target + # TODO: task logic should be implemented in another way + self.task: asyncio.Task = None + self.transport_id: str = transport_id + self.metadata: dict = None + self.api_key: str = None + topic = f"outbound/message/did/{target.did}" + payload = message + super().__init__(topic, payload) + + class EventBus: """A simple event bus implementation.""" diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index 58a1a0d3e7..405a3a6860 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -8,7 +8,6 @@ from typing import Callable, Type, Union from urllib.parse import urlparse -from ...connections.models.connection_target import ConnectionTarget from ...config.injection_context import InjectionContext from ...core.profile import Profile from ...utils.classloader import ClassLoader, ModuleLoadError, ClassNotFoundError @@ -26,43 +25,12 @@ ) from .message import OutboundMessage +from ...core.event_bus import QueuedOutboundMessage + LOGGER = logging.getLogger(__name__) MODULE_BASE_PATH = "aries_cloudagent.transport.outbound" -class QueuedOutboundMessage: - """Class representing an outbound message pending delivery.""" - - STATE_NEW = "new" - STATE_PENDING = "pending" - STATE_ENCODE = "encode" - STATE_DELIVER = "deliver" - STATE_RETRY = "retry" - STATE_DONE = "done" - - def __init__( - self, - profile: Profile, - message: OutboundMessage, - target: ConnectionTarget, - transport_id: str, - ): - """Initialize the queued outbound message.""" - self.profile = profile - self.endpoint = target and target.endpoint - self.error: Exception = None - self.message = message - self.payload: Union[str, bytes] = None - self.retries = None - self.retry_at: float = None - self.state = self.STATE_NEW - self.target = target - self.task: asyncio.Task = None - self.transport_id: str = transport_id - self.metadata: dict = None - self.api_key: str = None - - class OutboundTransportManager: """Outbound transport manager class.""" @@ -254,9 +222,9 @@ def enqueue_message(self, profile: Profile, outbound: OutboundMessage): break if not transport_id: raise OutboundDeliveryError("No supported transport for outbound message") - queued = QueuedOutboundMessage(profile, outbound, target, transport_id) queued.retries = self.MAX_RETRY_COUNT + # TODO: QueuedOutboundMessage usage self.outbound_new.append(queued) self.process_queued() @@ -290,10 +258,12 @@ def enqueue_webhook( api_key = endpoint_hash_split[1] queued.api_key = api_key queued.endpoint = f"{endpoint}/topic/{topic}/" + queued.topic = topic queued.metadata = metadata queued.payload = json.dumps(payload) queued.state = QueuedOutboundMessage.STATE_PENDING queued.retries = 4 if max_attempts is None else max_attempts - 1 + # TODO: QueuedOutboundMessage usage self.outbound_new.append(queued) self.process_queued() @@ -413,6 +383,7 @@ async def _process_loop(self): else: break + # TODO: QueuedOutboundMessage usage def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task: """Kick off encoding of a queued message.""" queued.task = self.task_queue.run( @@ -421,6 +392,7 @@ def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task: ) return queued.task + # TODO: QueuedOutboundMessage usage async def perform_encode(self, queued: QueuedOutboundMessage): """Perform message encoding.""" transport = self.get_transport_instance(queued.transport_id) @@ -434,6 +406,7 @@ async def perform_encode(self, queued: QueuedOutboundMessage): queued.target.sender_key, ) + # TODO: QueuedOutboundMessage usage def finished_encode(self, queued: QueuedOutboundMessage, completed: CompletedTask): """Handle completion of queued message encoding.""" if completed.exc_info: @@ -444,6 +417,7 @@ def finished_encode(self, queued: QueuedOutboundMessage, completed: CompletedTas queued.task = None self.process_queued() + # TODO: QueuedOutboundMessage usage def deliver_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task: """Kick off delivery of a queued message.""" transport = self.get_transport_instance(queued.transport_id) @@ -459,6 +433,7 @@ def deliver_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task: ) return queued.task + # TODO: QueuedOutboundMessage usage def finished_deliver(self, queued: QueuedOutboundMessage, completed: CompletedTask): """Handle completion of queued message delivery.""" if completed.exc_info: