Skip to content

Commit

Permalink
Merge pull request #1063 from Indicio-tech/feature/event-bus
Browse files Browse the repository at this point in the history
Feature/event bus
  • Loading branch information
andrewwhitehead authored Apr 27, 2021
2 parents 8a60d1a + 708f9d3 commit aa9a5c9
Show file tree
Hide file tree
Showing 39 changed files with 524 additions and 317 deletions.
20 changes: 0 additions & 20 deletions aries_cloudagent/admin/base_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@


from abc import ABC, abstractmethod
from typing import Sequence

from ..core.profile import Profile


class BaseAdminServer(ABC):
Expand All @@ -23,20 +20,3 @@ async def start(self) -> None:
@abstractmethod
async def stop(self) -> None:
"""Stop the webserver."""

@abstractmethod
def add_webhook_target(
self,
target_url: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Add a webhook target."""

@abstractmethod
def remove_webhook_target(self, target_url: str):
"""Remove a webhook target."""

@abstractmethod
async def send_webhook(self, profile: Profile, topic: str, payload: dict):
"""Add a webhook to the queue, to send to all registered targets."""
121 changes: 52 additions & 69 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,57 @@
"""Admin server classes."""

import asyncio
from hmac import compare_digest
import logging
import re
from typing import Callable, Coroutine
import uuid
import warnings

from typing import Callable, Coroutine, Sequence, Set

import aiohttp_cors
import jwt

from hmac import compare_digest
from aiohttp import web
from aiohttp_apispec import (
docs,
response_schema,
setup_aiohttp_apispec,
validation_middleware,
)

import aiohttp_cors
import jwt
from marshmallow import fields

from ..config.injection_context import InjectionContext
from ..core.profile import Profile
from ..core.event_bus import Event, EventBus
from ..core.plugin_registry import PluginRegistry
from ..core.profile import Profile
from ..ledger.error import LedgerConfigError, LedgerTransactionError
from ..messaging.models.openapi import OpenAPISchema
from ..messaging.responder import BaseResponder
from ..transport.queue.basic import BasicMessageQueue
from ..multitenant.manager import MultitenantManager, MultitenantManagerError
from ..storage.error import StorageNotFoundError
from ..transport.outbound.message import OutboundMessage
from ..transport.queue.basic import BasicMessageQueue
from ..utils.stats import Collector
from ..utils.task_queue import TaskQueue
from ..version import __version__
from ..multitenant.manager import MultitenantManager, MultitenantManagerError

from ..storage.error import StorageNotFoundError
from .base_server import BaseAdminServer
from .error import AdminSetupError
from .request_context import AdminRequestContext


LOGGER = logging.getLogger(__name__)

EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$")
EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$")

EVENT_WEBHOOK_MAPPING = {
"acapy::basicmessage::received": "basicmessages",
"acapy::problem_report": "problem_report",
"acapy::ping::received": "ping",
"acapy::ping::response_received": "ping",
"acapy::actionmenu::received": "actionmenu",
"acapy::actionmenu::get-active-menu": "get-active-menu",
"acapy::actionmenu::perform-menu-action": "perform-menu-action",
}


class AdminModulesSchema(OpenAPISchema):
"""Schema for the modules endpoint."""
Expand Down Expand Up @@ -93,7 +103,6 @@ def __init__(
self,
profile: Profile,
send: Coroutine,
webhook: Coroutine,
**kwargs,
):
"""
Expand All @@ -106,7 +115,6 @@ def __init__(
super().__init__(**kwargs)
self._profile = profile
self._send = send
self._webhook = webhook

async def send_outbound(self, message: OutboundMessage):
"""
Expand All @@ -119,53 +127,23 @@ async def send_outbound(self, message: OutboundMessage):

async def send_webhook(self, topic: str, payload: dict):
"""
Dispatch a webhook.
Dispatch a webhook. DEPRECATED: use the event bus instead.
Args:
topic: the webhook topic identifier
payload: the webhook payload value
"""
await self._webhook(self._profile, topic, payload)
warnings.warn(
"responder.send_webhook is deprecated; please use the event bus instead.",
DeprecationWarning,
)
await self._profile.notify("acapy::webhook::" + topic, payload)

@property
def send_fn(self) -> Coroutine:
"""Accessor for async function to send outbound message."""
return self._send

@property
def webhook_fn(self) -> Coroutine:
"""Accessor for the async function to dispatch a webhook."""
return self._webhook


class WebhookTarget:
"""Class for managing webhook target information."""

def __init__(
self,
endpoint: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Initialize the webhook target."""
self.endpoint = endpoint
self.max_attempts = max_attempts
self._topic_filter = None
self.topic_filter = topic_filter # call setter

@property
def topic_filter(self) -> Set[str]:
"""Accessor for the target's topic filter."""
return self._topic_filter

@topic_filter.setter
def topic_filter(self, val: Sequence[str]):
"""Setter for the target's topic filter."""
filt = set(val) if val else None
if filt and "*" in filt:
filt = None
self._topic_filter = filt


@web.middleware
async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
Expand Down Expand Up @@ -270,7 +248,6 @@ def __init__(
self.root_profile = root_profile
self.task_queue = task_queue
self.webhook_router = webhook_router
self.webhook_targets = {}
self.websocket_queues = {}
self.site = None
self.multitenant_manager = context.inject(MultitenantManager, required=False)
Expand Down Expand Up @@ -371,7 +348,6 @@ async def setup_context(request: web.Request, handler):
responder = AdminResponder(
profile,
self.outbound_message_router,
self.send_webhook,
)
profile.context.injector.bind_instance(BaseResponder, responder)

Expand Down Expand Up @@ -472,6 +448,19 @@ def sort_dict(raw: dict) -> dict:
if plugin_registry:
plugin_registry.post_process_routes(self.app)

event_bus = self.context.inject(EventBus, required=False)
if event_bus:
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event)
event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event)

for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
event_bus.subscribe(
re.compile(re.escape(event_topic)),
lambda profile, event, webhook_topic=webhook_topic: self.send_webhook(
profile, webhook_topic, event.payload
),
)

# order tags alphabetically, parameters deterministically and pythonically
swagger_dict = self.app._state["swagger_dict"]
swagger_dict.get("tags", []).sort(key=lambda t: t["name"])
Expand Down Expand Up @@ -799,21 +788,17 @@ async def websocket_handler(self, request):

return ws

def add_webhook_target(
self,
target_url: str,
topic_filter: Sequence[str] = None,
max_attempts: int = None,
):
"""Add a webhook target."""
self.webhook_targets[target_url] = WebhookTarget(
target_url, topic_filter, max_attempts
)
async def __on_webhook_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

def remove_webhook_target(self, target_url: str):
"""Remove a webhook target."""
if target_url in self.webhook_targets:
del self.webhook_targets[target_url]
async def __on_record_event(self, profile: Profile, event: Event):
match = EVENT_PATTERN_RECORD.search(event.topic)
webhook_topic = match.group(1) if match else None
if webhook_topic:
await self.send_webhook(profile, webhook_topic, event.payload)

async def send_webhook(self, profile: Profile, topic: str, payload: dict):
"""Add a webhook to the queue, to send to all registered targets."""
Expand All @@ -825,8 +810,6 @@ async def send_webhook(self, profile: Profile, topic: str, payload: dict):
metadata = {"x-wallet-id": wallet_id}

if self.webhook_router:
# for idx, target in self.webhook_targets.items():
# if not target.topic_filter or topic in target.topic_filter:
for endpoint in webhook_urls:
self.webhook_router(
topic,
Expand Down
33 changes: 0 additions & 33 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,6 @@
from ..server import AdminServer, AdminSetupError


class TestAdminResponder(AsyncTestCase):
async def test_admin_responder(self):
admin_responder = test_module.AdminResponder(
None, async_mock.CoroutineMock(), async_mock.CoroutineMock()
)

assert admin_responder.send_fn is admin_responder._send
assert admin_responder.webhook_fn is admin_responder._webhook

message = test_module.OutboundMessage(payload="hello")
await admin_responder.send_outbound(message)
assert admin_responder._send.called_once_with(None, message)

await admin_responder.send_webhook("topic", {"payload": "hello"})
assert admin_responder._webhook.called_once_with("topic", {"outbound": "hello"})


class TestWebhookTarget(AsyncTestCase):
async def test_webhook_target(self):
webhook_target = test_module.WebhookTarget(
endpoint="localhost:8888",
topic_filter=["birthdays", "animal videos"],
max_attempts=None,
)
assert webhook_target.topic_filter == {"birthdays", "animal videos"}

webhook_target.topic_filter = []
assert webhook_target.topic_filter is None

webhook_target.topic_filter = ["duct cleaning", "*"]
assert webhook_target.topic_filter is None


class TestAdminServer(AsyncTestCase):
async def setUp(self):
self.message_results = []
Expand Down
4 changes: 4 additions & 0 deletions aries_cloudagent/config/default_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ..cache.base import BaseCache
from ..cache.in_memory import InMemoryCache
from ..core.event_bus import EventBus
from ..core.plugin_registry import PluginRegistry
from ..core.profile import ProfileManager, ProfileManagerProvider
from ..core.protocol_registry import ProtocolRegistry
Expand Down Expand Up @@ -42,6 +43,9 @@ async def build_context(self) -> InjectionContext:
# Global protocol registry
context.injector.bind_instance(ProtocolRegistry, ProtocolRegistry())

# Global event bus
context.injector.bind_instance(EventBus, EventBus())

# Global did resolver registry
did_resolver_registry = DIDResolverRegistry()
context.injector.bind_instance(DIDResolverRegistry, did_resolver_registry)
Expand Down
2 changes: 1 addition & 1 deletion aries_cloudagent/connections/models/conn_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def __eq__(self, other: Union[str, "ConnRecord.State"]) -> bool:
return self is ConnRecord.State.get(other)

RECORD_ID_NAME = "connection_id"
WEBHOOK_TOPIC = "connections"
RECORD_TOPIC = "connections"
LOG_STATE_FLAG = "debug.connections"
TAG_NAMES = {"my_did", "their_did", "request_id", "invitation_key"}

Expand Down
6 changes: 0 additions & 6 deletions aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ async def setup(self):
self.dispatcher.task_queue,
self.get_stats,
)
webhook_urls = context.settings.get("admin.webhook_urls")
if webhook_urls:
for url in webhook_urls:
self.admin_server.add_webhook_target(url)
context.injector.bind_instance(BaseAdminServer, self.admin_server)
except Exception:
LOGGER.exception("Unable to register admin server")
Expand Down Expand Up @@ -206,7 +202,6 @@ async def start(self) -> None:
responder = AdminResponder(
self.root_profile,
self.admin_server.outbound_message_router,
self.admin_server.send_webhook,
)
context.injector.bind_instance(BaseResponder, responder)

Expand Down Expand Up @@ -398,7 +393,6 @@ def inbound_message_router(
profile,
message,
self.outbound_message_router,
self.admin_server and self.admin_server.send_webhook,
lambda completed: self.dispatch_complete(message, completed),
)
except (LedgerConfigError, LedgerTransactionError) as e:
Expand Down
Loading

0 comments on commit aa9a5c9

Please sign in to comment.