Skip to content

Commit

Permalink
Merge pull request openwallet-foundation#98 from Luis-GA/feature/OutB…
Browse files Browse the repository at this point in the history
…oundMessageBus

Feature/out bound message bus
  • Loading branch information
Luis-GA authored Jun 8, 2021
2 parents e0a89e0 + 9ea55e9 commit a08e7ac
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 57 deletions.
66 changes: 63 additions & 3 deletions aries_cloudagent/core/event_bus.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A simple event bus."""

import logging

import re
from itertools import chain
from typing import TYPE_CHECKING, Any, Callable, Dict, Pattern, Sequence, Union

Expand Down Expand Up @@ -47,6 +47,11 @@ def __repr__(self):
return "<Event topic={}, payload={}>".format(self._topic, self._payload)


EVENT_PATTERN_QUEUEDOUTBOUNDMESSAGE = re.compile("^acapy::queuedoutboundMessage::(.*)$")
EVENT_PATTERN_OUTBOUNDMESSAGE = re.compile("^acapy::outboundMessage::(.*)$")
"""Outbound message representation."""


class QueuedOutboundMessage(Event):
"""Class representing an outbound message for EventBus"""

Expand All @@ -65,7 +70,7 @@ def topic(self):
@topic.setter
def topic(self, value):
"""Set this event's Topic."""
self._topic = f"outbound/message/target/{value}"
self._topic = f"outbound::queuedmessage::target::{value}"

def __init__(
self,
Expand All @@ -89,11 +94,66 @@ def __init__(
self.transport_id: str = transport_id
self.metadata: dict = None
self.api_key: str = None
topic = f"outbound/message/did/{target.did}"
topic = f"outbound::queuedmessage::did::{target.did}"
payload = message
super().__init__(topic, payload)


class OutboundMessage(Event):
"""Represents an outgoing message."""

@property
def topic(self):
"""Return this event's topic."""
return self._topic

@topic.setter
def topic(self, value):
"""Set this event's Topic."""
self._topic = f"outbound::message::target::{value}"

def __init__(
self,
*,
connection_id: str = None,
enc_payload: Union[str, bytes] = None,
endpoint: str = None,
payload: Union[str, bytes],
reply_session_id: str = None,
reply_thread_id: str = None,
reply_to_verkey: str = None,
reply_from_verkey: str = None,
target: ConnectionTarget = None,
target_list: Sequence[ConnectionTarget] = None,
to_session_only: bool = False,
):
"""Initialize an outgoing message."""
self.connection_id = connection_id
self.enc_payload = enc_payload
self._endpoint = endpoint
self.payload = payload
self.reply_session_id = reply_session_id
self.reply_thread_id = reply_thread_id
self.reply_to_verkey = reply_to_verkey
self.reply_from_verkey = reply_from_verkey
self.target = target
self.target_list = list(target_list) if target_list else []
self.to_session_only = to_session_only
topic = f"outbound::message::did::{target.did}"
payload = message
super().__init__(topic, payload)

def __repr__(self) -> str:
"""
Return a human readable representation of this class.
Returns:
A human readable string for this class
"""
items = ("{}={}".format(k, repr(v)) for k, v in self.__dict__.items())
return "<{}({})>".format(self.__class__.__name__, ", ".join(items))

class EventBus:
"""A simple event bus implementation."""

Expand Down
45 changes: 39 additions & 6 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@
OutboundDeliveryError,
OutboundTransportRegistrationError,
)
from .message import OutboundMessage

from ...core.event_bus import QueuedOutboundMessage
from ...core.event_bus import (
QueuedOutboundMessage,
Event,
EventBus,
EVENT_PATTERN_QUEUEDOUTBOUNDMESSAGE,
EVENT_PATTERN_OUTBOUNDMESSAGE,
OutboundMessage
)

LOGGER = logging.getLogger(__name__)
MODULE_BASE_PATH = "aries_cloudagent.transport.outbound"
Expand Down Expand Up @@ -68,6 +74,10 @@ async def setup(self):
)
for outbound_transport in outbound_transports:
self.register(outbound_transport)
event_bus = self.context.inject(EventBus, required=False)
if event_bus:
event_bus.subscribe(EVENT_PATTERN_QUEUEDOUTBOUNDMESSAGE, self._enqueue)
event_bus.subscribe(EVENT_PATTERN_OUTBOUNDMESSAGE, self._enqueue)

def register(self, module: str) -> str:
"""
Expand Down Expand Up @@ -202,7 +212,13 @@ def get_transport_instance(self, transport_id: str) -> BaseOutboundTransport:
"""Get an instance of a running transport by ID."""
return self.running_transports[transport_id]

def enqueue_message(self, profile: Profile, outbound: OutboundMessage):
def _enqueue(self, profile: Profile, event: Event):
match = EVENT_PATTERN_QUEUEDOUTBOUNDMESSAGE.search(event.topic)
topic = match.group(1) if match else None
if topic:
self._enqueue_message(profile, event.payload)

def _enqueue_message(self, profile: Profile, outbound: OutboundMessage):
"""
Add an outbound message to the queue.
Expand All @@ -224,8 +240,13 @@ def enqueue_message(self, profile: Profile, outbound: OutboundMessage):
raise OutboundDeliveryError("No supported transport for outbound message")
queued = QueuedOutboundMessage(profile, outbound, target, transport_id)
queued.retries = self.MAX_RETRY_COUNT
# TODO: QueuedOutboundMessage usage
self.outbound_new.append(queued)
# TODO: QueuedOutboundMessage usage
event_bus = self.context.inject(EventBus, required=False)
if event_bus:
# OutboundMessage is a extension of Event
await event_bus.notify(self, outbound)

self.process_queued()

def enqueue_webhook(
Expand Down Expand Up @@ -265,9 +286,21 @@ def enqueue_webhook(
queued.retries = 4 if max_attempts is None else max_attempts - 1
# TODO: QueuedOutboundMessage usage
self.outbound_new.append(queued)
self.process_queued()
event_bus = self.context.inject(EventBus, required=False)
if event_bus:
# QueuedOutboundMessage is a extension of Event
await event_bus.notify(self, queued)


#self.process_queued()

def _process(self, profile: Profile, event: Event):
match = EVENT_PATTERN_OUTBOUNDMESSAGE.search(event.topic)
topic = match.group(1) if match else None
if topic:
self._process_queued()

def process_queued(self) -> asyncio.Task:
def _process_queued(self) -> asyncio.Task:
"""
Start the process to deliver queued messages if necessary.
Expand Down
48 changes: 0 additions & 48 deletions aries_cloudagent/transport/outbound/message.py

This file was deleted.

0 comments on commit a08e7ac

Please sign in to comment.