Skip to content

Commit

Permalink
dummy approach for QueuedOutboundMessage
Browse files Browse the repository at this point in the history
Signed-off-by: Luis Gomez <luis.gomezalonso@sicpa.com>
  • Loading branch information
Luis Gomez committed Jun 7, 2021
1 parent faab034 commit e0a89e0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 36 deletions.
56 changes: 55 additions & 1 deletion aries_cloudagent/core/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import logging

from itertools import chain
from typing import TYPE_CHECKING, Any, Callable, Dict, Pattern, Sequence
from typing import TYPE_CHECKING, Any, Callable, Dict, Pattern, Sequence, Union

if TYPE_CHECKING: # To avoid circular import error
from .profile import Profile

from ..connections.models.connection_target import ConnectionTarget

LOGGER = logging.getLogger(__name__)


Expand All @@ -29,6 +31,11 @@ def payload(self):
"""Return this event's payload."""
return self._payload

@payload.setter
def payload(self, value):
"""Set this event's payload."""
self._payload = value

def __eq__(self, other):
"""Test equality."""
if not isinstance(other, Event):
Expand All @@ -40,6 +47,53 @@ def __repr__(self):
return "<Event topic={}, payload={}>".format(self._topic, self._payload)


class QueuedOutboundMessage(Event):
"""Class representing an outbound message for EventBus"""

STATE_NEW = "new"
STATE_PENDING = "pending"
STATE_ENCODE = "encode"
STATE_DELIVER = "deliver"
STATE_RETRY = "retry"
STATE_DONE = "done"

@property
def topic(self):
"""Return this event's topic."""
return self._topic

@topic.setter
def topic(self, value):
"""Set this event's Topic."""
self._topic = f"outbound/message/target/{value}"

def __init__(
self,
profile: Profile,
message: Any,
target: ConnectionTarget,
transport_id: str,
):
"""Initialize the queued outbound message."""
self.profile = profile
self.endpoint = target and target.endpoint
self.error: Exception = None
self.message = message
self.payload = None
self.retries = None
self.retry_at: float = None
self.state = self.STATE_NEW
self.target = target
# TODO: task logic should be implemented in another way
self.task: asyncio.Task = None
self.transport_id: str = transport_id
self.metadata: dict = None
self.api_key: str = None
topic = f"outbound/message/did/{target.did}"
payload = message
super().__init__(topic, payload)


class EventBus:
"""A simple event bus implementation."""

Expand Down
45 changes: 10 additions & 35 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import Callable, Type, Union
from urllib.parse import urlparse

from ...connections.models.connection_target import ConnectionTarget
from ...config.injection_context import InjectionContext
from ...core.profile import Profile
from ...utils.classloader import ClassLoader, ModuleLoadError, ClassNotFoundError
Expand All @@ -26,43 +25,12 @@
)
from .message import OutboundMessage

from ...core.event_bus import QueuedOutboundMessage

LOGGER = logging.getLogger(__name__)
MODULE_BASE_PATH = "aries_cloudagent.transport.outbound"


class QueuedOutboundMessage:
"""Class representing an outbound message pending delivery."""

STATE_NEW = "new"
STATE_PENDING = "pending"
STATE_ENCODE = "encode"
STATE_DELIVER = "deliver"
STATE_RETRY = "retry"
STATE_DONE = "done"

def __init__(
self,
profile: Profile,
message: OutboundMessage,
target: ConnectionTarget,
transport_id: str,
):
"""Initialize the queued outbound message."""
self.profile = profile
self.endpoint = target and target.endpoint
self.error: Exception = None
self.message = message
self.payload: Union[str, bytes] = None
self.retries = None
self.retry_at: float = None
self.state = self.STATE_NEW
self.target = target
self.task: asyncio.Task = None
self.transport_id: str = transport_id
self.metadata: dict = None
self.api_key: str = None


class OutboundTransportManager:
"""Outbound transport manager class."""

Expand Down Expand Up @@ -254,9 +222,9 @@ def enqueue_message(self, profile: Profile, outbound: OutboundMessage):
break
if not transport_id:
raise OutboundDeliveryError("No supported transport for outbound message")

queued = QueuedOutboundMessage(profile, outbound, target, transport_id)
queued.retries = self.MAX_RETRY_COUNT
# TODO: QueuedOutboundMessage usage
self.outbound_new.append(queued)
self.process_queued()

Expand Down Expand Up @@ -290,10 +258,12 @@ def enqueue_webhook(
api_key = endpoint_hash_split[1]
queued.api_key = api_key
queued.endpoint = f"{endpoint}/topic/{topic}/"
queued.topic = topic
queued.metadata = metadata
queued.payload = json.dumps(payload)
queued.state = QueuedOutboundMessage.STATE_PENDING
queued.retries = 4 if max_attempts is None else max_attempts - 1
# TODO: QueuedOutboundMessage usage
self.outbound_new.append(queued)
self.process_queued()

Expand Down Expand Up @@ -413,6 +383,7 @@ async def _process_loop(self):
else:
break

# TODO: QueuedOutboundMessage usage
def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task:
"""Kick off encoding of a queued message."""
queued.task = self.task_queue.run(
Expand All @@ -421,6 +392,7 @@ def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task:
)
return queued.task

# TODO: QueuedOutboundMessage usage
async def perform_encode(self, queued: QueuedOutboundMessage):
"""Perform message encoding."""
transport = self.get_transport_instance(queued.transport_id)
Expand All @@ -434,6 +406,7 @@ async def perform_encode(self, queued: QueuedOutboundMessage):
queued.target.sender_key,
)

# TODO: QueuedOutboundMessage usage
def finished_encode(self, queued: QueuedOutboundMessage, completed: CompletedTask):
"""Handle completion of queued message encoding."""
if completed.exc_info:
Expand All @@ -444,6 +417,7 @@ def finished_encode(self, queued: QueuedOutboundMessage, completed: CompletedTas
queued.task = None
self.process_queued()

# TODO: QueuedOutboundMessage usage
def deliver_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task:
"""Kick off delivery of a queued message."""
transport = self.get_transport_instance(queued.transport_id)
Expand All @@ -459,6 +433,7 @@ def deliver_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task:
)
return queued.task

# TODO: QueuedOutboundMessage usage
def finished_deliver(self, queued: QueuedOutboundMessage, completed: CompletedTask):
"""Handle completion of queued message delivery."""
if completed.exc_info:
Expand Down

0 comments on commit e0a89e0

Please sign in to comment.