diff --git a/aries_cloudagent/admin/base_server.py b/aries_cloudagent/admin/base_server.py index d6e4673f79..a05b238017 100644 --- a/aries_cloudagent/admin/base_server.py +++ b/aries_cloudagent/admin/base_server.py @@ -2,9 +2,6 @@ from abc import ABC, abstractmethod -from typing import Sequence - -from ..core.profile import Profile class BaseAdminServer(ABC): @@ -23,20 +20,3 @@ async def start(self) -> None: @abstractmethod async def stop(self) -> None: """Stop the webserver.""" - - @abstractmethod - def add_webhook_target( - self, - target_url: str, - topic_filter: Sequence[str] = None, - max_attempts: int = None, - ): - """Add a webhook target.""" - - @abstractmethod - def remove_webhook_target(self, target_url: str): - """Remove a webhook target.""" - - @abstractmethod - async def send_webhook(self, profile: Profile, topic: str, payload: dict): - """Add a webhook to the queue, to send to all registered targets.""" diff --git a/aries_cloudagent/admin/server.py b/aries_cloudagent/admin/server.py index abdf7469b4..4c6e5a0a81 100644 --- a/aries_cloudagent/admin/server.py +++ b/aries_cloudagent/admin/server.py @@ -1,16 +1,13 @@ """Admin server classes.""" import asyncio +from hmac import compare_digest import logging import re +from typing import Callable, Coroutine import uuid +import warnings -from typing import Callable, Coroutine, Sequence, Set - -import aiohttp_cors -import jwt - -from hmac import compare_digest from aiohttp import web from aiohttp_apispec import ( docs, @@ -18,30 +15,43 @@ setup_aiohttp_apispec, validation_middleware, ) - +import aiohttp_cors +import jwt from marshmallow import fields from ..config.injection_context import InjectionContext -from ..core.profile import Profile +from ..core.event_bus import Event, EventBus from ..core.plugin_registry import PluginRegistry +from ..core.profile import Profile from ..ledger.error import LedgerConfigError, LedgerTransactionError from ..messaging.models.openapi import OpenAPISchema from ..messaging.responder import BaseResponder -from ..transport.queue.basic import BasicMessageQueue +from ..multitenant.manager import MultitenantManager, MultitenantManagerError +from ..storage.error import StorageNotFoundError from ..transport.outbound.message import OutboundMessage +from ..transport.queue.basic import BasicMessageQueue from ..utils.stats import Collector from ..utils.task_queue import TaskQueue from ..version import __version__ -from ..multitenant.manager import MultitenantManager, MultitenantManagerError - -from ..storage.error import StorageNotFoundError from .base_server import BaseAdminServer from .error import AdminSetupError from .request_context import AdminRequestContext - LOGGER = logging.getLogger(__name__) +EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$") +EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$") + +EVENT_WEBHOOK_MAPPING = { + "acapy::basicmessage::received": "basicmessages", + "acapy::problem_report": "problem_report", + "acapy::ping::received": "ping", + "acapy::ping::response_received": "ping", + "acapy::actionmenu::received": "actionmenu", + "acapy::actionmenu::get-active-menu": "get-active-menu", + "acapy::actionmenu::perform-menu-action": "perform-menu-action", +} + class AdminModulesSchema(OpenAPISchema): """Schema for the modules endpoint.""" @@ -93,7 +103,6 @@ def __init__( self, profile: Profile, send: Coroutine, - webhook: Coroutine, **kwargs, ): """ @@ -106,7 +115,6 @@ def __init__( super().__init__(**kwargs) self._profile = profile self._send = send - self._webhook = webhook async def send_outbound(self, message: OutboundMessage): """ @@ -119,53 +127,23 @@ async def send_outbound(self, message: OutboundMessage): async def send_webhook(self, topic: str, payload: dict): """ - Dispatch a webhook. + Dispatch a webhook. DEPRECATED: use the event bus instead. Args: topic: the webhook topic identifier payload: the webhook payload value """ - await self._webhook(self._profile, topic, payload) + warnings.warn( + "responder.send_webhook is deprecated; please use the event bus instead.", + DeprecationWarning, + ) + await self._profile.notify("acapy::webhook::" + topic, payload) @property def send_fn(self) -> Coroutine: """Accessor for async function to send outbound message.""" return self._send - @property - def webhook_fn(self) -> Coroutine: - """Accessor for the async function to dispatch a webhook.""" - return self._webhook - - -class WebhookTarget: - """Class for managing webhook target information.""" - - def __init__( - self, - endpoint: str, - topic_filter: Sequence[str] = None, - max_attempts: int = None, - ): - """Initialize the webhook target.""" - self.endpoint = endpoint - self.max_attempts = max_attempts - self._topic_filter = None - self.topic_filter = topic_filter # call setter - - @property - def topic_filter(self) -> Set[str]: - """Accessor for the target's topic filter.""" - return self._topic_filter - - @topic_filter.setter - def topic_filter(self, val: Sequence[str]): - """Setter for the target's topic filter.""" - filt = set(val) if val else None - if filt and "*" in filt: - filt = None - self._topic_filter = filt - @web.middleware async def ready_middleware(request: web.BaseRequest, handler: Coroutine): @@ -270,7 +248,6 @@ def __init__( self.root_profile = root_profile self.task_queue = task_queue self.webhook_router = webhook_router - self.webhook_targets = {} self.websocket_queues = {} self.site = None self.multitenant_manager = context.inject(MultitenantManager, required=False) @@ -371,7 +348,6 @@ async def setup_context(request: web.Request, handler): responder = AdminResponder( profile, self.outbound_message_router, - self.send_webhook, ) profile.context.injector.bind_instance(BaseResponder, responder) @@ -472,6 +448,19 @@ def sort_dict(raw: dict) -> dict: if plugin_registry: plugin_registry.post_process_routes(self.app) + event_bus = self.context.inject(EventBus, required=False) + if event_bus: + event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event) + event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event) + + for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items(): + event_bus.subscribe( + re.compile(re.escape(event_topic)), + lambda profile, event, webhook_topic=webhook_topic: self.send_webhook( + profile, webhook_topic, event.payload + ), + ) + # order tags alphabetically, parameters deterministically and pythonically swagger_dict = self.app._state["swagger_dict"] swagger_dict.get("tags", []).sort(key=lambda t: t["name"]) @@ -799,21 +788,17 @@ async def websocket_handler(self, request): return ws - def add_webhook_target( - self, - target_url: str, - topic_filter: Sequence[str] = None, - max_attempts: int = None, - ): - """Add a webhook target.""" - self.webhook_targets[target_url] = WebhookTarget( - target_url, topic_filter, max_attempts - ) + async def __on_webhook_event(self, profile: Profile, event: Event): + match = EVENT_PATTERN_WEBHOOK.search(event.topic) + webhook_topic = match.group(1) if match else None + if webhook_topic: + await self.send_webhook(profile, webhook_topic, event.payload) - def remove_webhook_target(self, target_url: str): - """Remove a webhook target.""" - if target_url in self.webhook_targets: - del self.webhook_targets[target_url] + async def __on_record_event(self, profile: Profile, event: Event): + match = EVENT_PATTERN_RECORD.search(event.topic) + webhook_topic = match.group(1) if match else None + if webhook_topic: + await self.send_webhook(profile, webhook_topic, event.payload) async def send_webhook(self, profile: Profile, topic: str, payload: dict): """Add a webhook to the queue, to send to all registered targets.""" @@ -825,8 +810,6 @@ async def send_webhook(self, profile: Profile, topic: str, payload: dict): metadata = {"x-wallet-id": wallet_id} if self.webhook_router: - # for idx, target in self.webhook_targets.items(): - # if not target.topic_filter or topic in target.topic_filter: for endpoint in webhook_urls: self.webhook_router( topic, diff --git a/aries_cloudagent/admin/tests/test_admin_server.py b/aries_cloudagent/admin/tests/test_admin_server.py index 0b0906c220..34dfa23b90 100644 --- a/aries_cloudagent/admin/tests/test_admin_server.py +++ b/aries_cloudagent/admin/tests/test_admin_server.py @@ -18,39 +18,6 @@ from ..server import AdminServer, AdminSetupError -class TestAdminResponder(AsyncTestCase): - async def test_admin_responder(self): - admin_responder = test_module.AdminResponder( - None, async_mock.CoroutineMock(), async_mock.CoroutineMock() - ) - - assert admin_responder.send_fn is admin_responder._send - assert admin_responder.webhook_fn is admin_responder._webhook - - message = test_module.OutboundMessage(payload="hello") - await admin_responder.send_outbound(message) - assert admin_responder._send.called_once_with(None, message) - - await admin_responder.send_webhook("topic", {"payload": "hello"}) - assert admin_responder._webhook.called_once_with("topic", {"outbound": "hello"}) - - -class TestWebhookTarget(AsyncTestCase): - async def test_webhook_target(self): - webhook_target = test_module.WebhookTarget( - endpoint="localhost:8888", - topic_filter=["birthdays", "animal videos"], - max_attempts=None, - ) - assert webhook_target.topic_filter == {"birthdays", "animal videos"} - - webhook_target.topic_filter = [] - assert webhook_target.topic_filter is None - - webhook_target.topic_filter = ["duct cleaning", "*"] - assert webhook_target.topic_filter is None - - class TestAdminServer(AsyncTestCase): async def setUp(self): self.message_results = [] diff --git a/aries_cloudagent/config/default_context.py b/aries_cloudagent/config/default_context.py index 31d0246987..2379e4b6a1 100644 --- a/aries_cloudagent/config/default_context.py +++ b/aries_cloudagent/config/default_context.py @@ -6,6 +6,7 @@ from ..cache.base import BaseCache from ..cache.in_memory import InMemoryCache +from ..core.event_bus import EventBus from ..core.plugin_registry import PluginRegistry from ..core.profile import ProfileManager, ProfileManagerProvider from ..core.protocol_registry import ProtocolRegistry @@ -42,6 +43,9 @@ async def build_context(self) -> InjectionContext: # Global protocol registry context.injector.bind_instance(ProtocolRegistry, ProtocolRegistry()) + # Global event bus + context.injector.bind_instance(EventBus, EventBus()) + # Global did resolver registry did_resolver_registry = DIDResolverRegistry() context.injector.bind_instance(DIDResolverRegistry, did_resolver_registry) diff --git a/aries_cloudagent/connections/models/conn_record.py b/aries_cloudagent/connections/models/conn_record.py index c91eb2cdc3..041484275d 100644 --- a/aries_cloudagent/connections/models/conn_record.py +++ b/aries_cloudagent/connections/models/conn_record.py @@ -141,7 +141,7 @@ def __eq__(self, other: Union[str, "ConnRecord.State"]) -> bool: return self is ConnRecord.State.get(other) RECORD_ID_NAME = "connection_id" - WEBHOOK_TOPIC = "connections" + RECORD_TOPIC = "connections" LOG_STATE_FLAG = "debug.connections" TAG_NAMES = {"my_did", "their_did", "request_id", "invitation_key"} diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index 29c40530eb..0127fd6d43 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -146,10 +146,6 @@ async def setup(self): self.dispatcher.task_queue, self.get_stats, ) - webhook_urls = context.settings.get("admin.webhook_urls") - if webhook_urls: - for url in webhook_urls: - self.admin_server.add_webhook_target(url) context.injector.bind_instance(BaseAdminServer, self.admin_server) except Exception: LOGGER.exception("Unable to register admin server") @@ -206,7 +202,6 @@ async def start(self) -> None: responder = AdminResponder( self.root_profile, self.admin_server.outbound_message_router, - self.admin_server.send_webhook, ) context.injector.bind_instance(BaseResponder, responder) @@ -398,7 +393,6 @@ def inbound_message_router( profile, message, self.outbound_message_router, - self.admin_server and self.admin_server.send_webhook, lambda completed: self.dispatch_complete(message, completed), ) except (LedgerConfigError, LedgerTransactionError) as e: diff --git a/aries_cloudagent/core/dispatcher.py b/aries_cloudagent/core/dispatcher.py index 3d313e19e2..2c8a6b69e8 100644 --- a/aries_cloudagent/core/dispatcher.py +++ b/aries_cloudagent/core/dispatcher.py @@ -9,6 +9,7 @@ import logging import os from typing import Callable, Coroutine, Union +import warnings from aiohttp.web import HTTPException @@ -88,7 +89,6 @@ def queue_message( profile: Profile, inbound_message: InboundMessage, send_outbound: Coroutine, - send_webhook: Coroutine = None, complete: Callable = None, ) -> PendingTask: """ @@ -98,7 +98,6 @@ def queue_message( profile: The profile associated with the inbound message inbound_message: The inbound message instance send_outbound: Async function to send outbound messages - send_webhook: Async function to dispatch a webhook complete: Function to call when the handler has completed Returns: @@ -106,7 +105,7 @@ def queue_message( """ return self.put_task( - self.handle_message(profile, inbound_message, send_outbound, send_webhook), + self.handle_message(profile, inbound_message, send_outbound), complete, ) @@ -115,7 +114,6 @@ async def handle_message( profile: Profile, inbound_message: InboundMessage, send_outbound: Coroutine, - send_webhook: Coroutine = None, ): """ Configure responder and message context and invoke the message handler. @@ -124,7 +122,6 @@ async def handle_message( profile: The profile associated with the inbound message inbound_message: The inbound message instance send_outbound: Async function to send outbound messages - send_webhook: Async function to dispatch a webhook Returns: The response from the handler @@ -156,7 +153,6 @@ async def handle_message( context, inbound_message, send_outbound, - send_webhook, reply_session_id=inbound_message.session_id, reply_to_verkey=inbound_message.receipt.sender_verkey, ) @@ -247,7 +243,6 @@ def __init__( context: RequestContext, inbound_message: InboundMessage, send_outbound: Coroutine, - send_webhook: Coroutine = None, **kwargs, ): """ @@ -257,14 +252,12 @@ def __init__( context: The request context of the incoming message inbound_message: The inbound message triggering this handler send_outbound: Async function to send outbound message - send_webhook: Async function to dispatch a webhook """ super().__init__(**kwargs) self._context = context self._inbound_message = inbound_message self._send = send_outbound - self._webhook = send_webhook async def create_outbound( self, message: Union[AgentMessage, str, bytes], **kwargs @@ -300,11 +293,14 @@ async def send_outbound(self, message: OutboundMessage): async def send_webhook(self, topic: str, payload: dict): """ - Dispatch a webhook. + Dispatch a webhook. DEPRECATED: use the event bus instead. Args: topic: the webhook topic identifier payload: the webhook payload value """ - if self._webhook: - await self._webhook(self._context.profile, topic, payload) + warnings.warn( + "responder.send_webhook is deprecated; please use the event bus instead.", + DeprecationWarning, + ) + await self._context.profile.notify("acapy::webhook::" + topic, payload) diff --git a/aries_cloudagent/core/event_bus.py b/aries_cloudagent/core/event_bus.py new file mode 100644 index 0000000000..bd14e96a28 --- /dev/null +++ b/aries_cloudagent/core/event_bus.py @@ -0,0 +1,119 @@ +"""A simple event bus.""" + +import logging +from itertools import chain +from typing import TYPE_CHECKING, Any, Callable, Dict, Pattern, Sequence + +if TYPE_CHECKING: # To avoid circular import error + from .profile import Profile + +LOGGER = logging.getLogger(__name__) + + +class Event: + """A simple event object.""" + + def __init__(self, topic: str, payload: Any = None): + """Create a new event.""" + self._topic = topic + self._payload = payload + + @property + def topic(self): + """Return this event's topic.""" + return self._topic + + @property + def payload(self): + """Return this event's payload.""" + return self._payload + + def __eq__(self, other): + """Test equality.""" + if not isinstance(other, Event): + return False + return self._topic == other._topic and self._payload == other._payload + + def __repr__(self): + """Return debug representation.""" + return "".format(self._topic, self._payload) + + +class EventBus: + """A simple event bus implementation.""" + + def __init__(self): + """Initialize Event Bus.""" + self.topic_patterns_to_subscribers: Dict[Pattern, Sequence[Callable]] = {} + + async def notify(self, profile: "Profile", event: Event): + """Notify subscribers of event. + + Args: + profile (Profile): context of the event + event (Event): event to emit + + """ + # TODO don't block notifier until subscribers have all been called? + # TODO trigger each processor but don't await? + # TODO log errors but otherwise ignore? + + LOGGER.debug("Notifying subscribers: %s", event) + matched = [ + processor + for pattern, processor in self.topic_patterns_to_subscribers.items() + if pattern.match(event.topic) + ] + + for processor in chain(*matched): + try: + await processor(profile, event) + except Exception: + LOGGER.exception("Error occurred while processing event") + + def subscribe(self, pattern: Pattern, processor: Callable): + """Subscribe to an event. + + Args: + pattern (Pattern): compiled regular expression for matching topics + processor (Callable): async callable accepting profile and event + + """ + LOGGER.debug("Subscribed: topic %s, processor %s", pattern, processor) + if pattern not in self.topic_patterns_to_subscribers: + self.topic_patterns_to_subscribers[pattern] = [] + self.topic_patterns_to_subscribers[pattern].append(processor) + + def unsubscribe(self, pattern: Pattern, processor: Callable): + """Unsubscribe from an event. + + This method is idempotent. Repeated calls to unsubscribe will not + result in errors. + + Args: + pattern (Pattern): regular expression used to subscribe the processor + processor (Callable): processor to unsubscribe + + """ + if pattern in self.topic_patterns_to_subscribers: + try: + index = self.topic_patterns_to_subscribers[pattern].index(processor) + except ValueError: + return + del self.topic_patterns_to_subscribers[pattern][index] + if not self.topic_patterns_to_subscribers[pattern]: + del self.topic_patterns_to_subscribers[pattern] + LOGGER.debug("Unsubscribed: topic %s, processor %s", pattern, processor) + + +class MockEventBus(EventBus): + """A mock EventBus for testing.""" + + def __init__(self): + """Initialize MockEventBus.""" + super().__init__() + self.events = [] + + async def notify(self, profile: "Profile", event: Event): + """Append the event to MockEventBus.events.""" + self.events.append((profile, event)) diff --git a/aries_cloudagent/core/profile.py b/aries_cloudagent/core/profile.py index 3900abd67c..2b7adeea38 100644 --- a/aries_cloudagent/core/profile.py +++ b/aries_cloudagent/core/profile.py @@ -5,6 +5,7 @@ from abc import ABC, abstractmethod from typing import Any, Mapping, Optional, Type +from .event_bus import EventBus, Event from ..config.base import InjectionError from ..config.injector import BaseInjector, InjectType from ..config.injection_context import InjectionContext @@ -99,6 +100,12 @@ async def close(self): async def remove(self): """Remove the profile.""" + async def notify(self, topic: str, payload: Any): + """Signal an event.""" + event_bus = self.inject(EventBus, required=False) + if event_bus: + await event_bus.notify(self, Event(topic, payload)) + def __repr__(self) -> str: """Get a human readable string.""" return "<{}(backend={}, name={})>".format( diff --git a/aries_cloudagent/core/tests/test_conductor.py b/aries_cloudagent/core/tests/test_conductor.py index 16667ac739..1ac3e2b03f 100644 --- a/aries_cloudagent/core/tests/test_conductor.py +++ b/aries_cloudagent/core/tests/test_conductor.py @@ -235,8 +235,7 @@ async def test_inbound_message_handler(self): assert mock_dispatch_q.call_args[0][0] is conductor.context assert mock_dispatch_q.call_args[0][1] is message assert mock_dispatch_q.call_args[0][2] == conductor.outbound_message_router - assert mock_dispatch_q.call_args[0][3] is None # admin webhook router - assert callable(mock_dispatch_q.call_args[0][4]) + assert callable(mock_dispatch_q.call_args[0][3]) async def test_inbound_message_handler_ledger_x(self): builder: ContextBuilder = StubContextBuilder(self.test_settings_admin) diff --git a/aries_cloudagent/core/tests/test_dispatcher.py b/aries_cloudagent/core/tests/test_dispatcher.py index 3c94b0f5b4..ba5e9bd080 100644 --- a/aries_cloudagent/core/tests/test_dispatcher.py +++ b/aries_cloudagent/core/tests/test_dispatcher.py @@ -6,6 +6,7 @@ from ...config.injection_context import InjectionContext from ...connections.models.conn_record import ConnRecord +from ...core.event_bus import EventBus from ...core.in_memory import InMemoryProfile from ...core.profile import Profile from ...core.protocol_registry import ProtocolRegistry @@ -29,6 +30,7 @@ def make_profile() -> Profile: profile = InMemoryProfile.test_profile() profile.context.injector.bind_instance(ProtocolRegistry, ProtocolRegistry()) profile.context.injector.bind_instance(Collector, Collector()) + profile.context.injector.bind_instance(EventBus, EventBus()) return profile @@ -348,24 +350,6 @@ async def test_dispatch_log(self): ) dispatcher.log_task(mock_task) - async def test_create_outbound_send_webhook(self): - profile = make_profile() - context = RequestContext(profile) - context.message_receipt = async_mock.MagicMock(in_time=datetime_now()) - context.update_settings({"timing.enabled": True}) - message = StubAgentMessage() - responder = test_module.DispatcherResponder( - context, message, None, async_mock.CoroutineMock() - ) - result = await responder.create_outbound(message) - assert json.loads(result.payload)["@type"] == DIDCommPrefix.qualify_current( - StubAgentMessage.Meta.message_type - ) - await responder.send_webhook("topic", "payload") - - context.default_endpoint = "http://agent.ca" - assert context.default_endpoint == "http://agent.ca" - async def test_create_send_outbound(self): message = StubAgentMessage() responder = MockResponder() @@ -377,9 +361,7 @@ async def test_create_enc_outbound(self): profile = make_profile() context = RequestContext(profile) message = b"abc123xyz7890000" - responder = test_module.DispatcherResponder( - context, message, None, async_mock.CoroutineMock() - ) + responder = test_module.DispatcherResponder(context, message, None) with async_mock.patch.object( responder, "send_outbound", async_mock.CoroutineMock() ) as mock_send_outbound: diff --git a/aries_cloudagent/core/tests/test_event_bus.py b/aries_cloudagent/core/tests/test_event_bus.py new file mode 100644 index 0000000000..af99be1756 --- /dev/null +++ b/aries_cloudagent/core/tests/test_event_bus.py @@ -0,0 +1,161 @@ +"""Test Event Bus.""" + +import pytest +import re +from asynctest import mock as async_mock +from ..event_bus import EventBus, Event + +# pylint: disable=redefined-outer-name + + +@pytest.fixture +def event_bus(): + yield EventBus() + + +@pytest.fixture +def context(): + yield async_mock.MagicMock() + + +@pytest.fixture +def event(): + event = Event(topic="anything", payload="payload") + yield event + + +class TestProcessor: + def __init__(self): + self.context = None + self.event = None + + async def __call__(self, context, event): + self.context = context + self.event = event + + +@pytest.fixture +def processor(): + yield TestProcessor() + + +def test_event(event): + assert event.topic == "anything" + assert event.payload == "payload" + other = Event("anything", "payload") + another = Event("nothing", "payload") + and_another = Event("anything") + assert event == other + assert event != another + assert event != and_another + assert event != "random string" + assert repr(event) + + +def test_sub_unsub(event_bus: EventBus, processor): + """Test subscribe and unsubscribe.""" + event_bus.subscribe(re.compile(".*"), processor) + assert event_bus.topic_patterns_to_subscribers + assert event_bus.topic_patterns_to_subscribers[re.compile(".*")] == [processor] + event_bus.unsubscribe(re.compile(".*"), processor) + assert not event_bus.topic_patterns_to_subscribers + + +def test_unsub_idempotency(event_bus: EventBus, processor): + """Test unsubscribe idempotency.""" + event_bus.subscribe(re.compile(".*"), processor) + event_bus.unsubscribe(re.compile(".*"), processor) + assert not event_bus.topic_patterns_to_subscribers + event_bus.unsubscribe(re.compile(".*"), processor) + assert not event_bus.topic_patterns_to_subscribers + + +def test_unsub_unsubbed_processor(event_bus: EventBus, processor): + """Test unsubscribing an unsubscribed processor does not error.""" + event_bus.unsubscribe(re.compile(".*"), processor) + event_bus.subscribe(re.compile(".*"), processor) + another_processor = TestProcessor() + event_bus.unsubscribe(re.compile(".*"), another_processor) + + +@pytest.mark.asyncio +async def test_sub_notify(event_bus: EventBus, context, event, processor): + """Test subscriber receives event.""" + event_bus.subscribe(re.compile(".*"), processor) + await event_bus.notify(context, event) + assert processor.context == context + assert processor.event == event + + +@pytest.mark.asyncio +async def test_sub_notify_error_logged_and_exec_continues( + event_bus: EventBus, context, event, caplog +): + """Test subscriber errors are logged but do not halt execution.""" + + def _raise_exception(context, event): + raise Exception() + + processor = TestProcessor() + bad_processor = _raise_exception + event_bus.subscribe(re.compile(".*"), bad_processor) + event_bus.subscribe(re.compile(".*"), processor) + await event_bus.notify(context, event) + assert "Error occurred while processing event" in caplog.text + assert processor.context == context + assert processor.event == event + + +@pytest.mark.parametrize( + "pattern, topic", + [ + ("test", "test"), + (".*", "test"), + ("topic::with::namespace", "topic::with::namespace::like::pieces"), + ], +) +@pytest.mark.asyncio +async def test_sub_notify_regex_filtering( + event_bus: EventBus, context, processor, pattern, topic +): + """Test events are filtered correctly.""" + event = Event(topic) + event_bus.subscribe(re.compile(pattern), processor) + await event_bus.notify(context, event) + assert processor.context == context + assert processor.event == event + + +@pytest.mark.asyncio +async def test_sub_notify_no_match(event_bus: EventBus, context, event, processor): + """Test event not given to processor when pattern doesn't match.""" + event_bus.subscribe(re.compile("^$"), processor) + await event_bus.notify(context, event) + assert processor.context is None + assert processor.event is None + + +@pytest.mark.asyncio +async def test_sub_notify_only_one(event_bus: EventBus, context, event, processor): + """Test only one subscriber is called when pattern matches only one.""" + processor1 = TestProcessor() + event_bus.subscribe(re.compile(".*"), processor) + event_bus.subscribe(re.compile("^$"), processor1) + await event_bus.notify(context, event) + assert processor.context == context + assert processor.event == event + assert processor1.context is None + assert processor1.event is None + + +@pytest.mark.asyncio +async def test_sub_notify_both(event_bus: EventBus, context, event, processor): + """Test both subscribers are called when pattern matches both.""" + processor1 = TestProcessor() + event_bus.subscribe(re.compile(".*"), processor) + event_bus.subscribe(re.compile("anything"), processor1) + await event_bus.notify(context, event) + assert processor.context == context + assert processor.event == event + assert processor1.context == context + assert processor1.event == event diff --git a/aries_cloudagent/messaging/models/base_record.py b/aries_cloudagent/messaging/models/base_record.py index 058a466ba9..340ecc3ee0 100644 --- a/aries_cloudagent/messaging/models/base_record.py +++ b/aries_cloudagent/messaging/models/base_record.py @@ -5,7 +5,7 @@ import uuid from datetime import datetime -from typing import Any, Mapping, Sequence, Union +from typing import Any, Mapping, Optional, Sequence, Union from marshmallow import fields @@ -16,7 +16,6 @@ from ...storage.record import StorageRecord from .base import BaseModel, BaseModelSchema -from ..responder import BaseResponder from ..util import datetime_to_str, time_now from ..valid import INDY_ISO8601_DATETIME @@ -70,7 +69,7 @@ class Meta: DEFAULT_CACHE_TTL = 60 RECORD_ID_NAME = "id" RECORD_TYPE = None - WEBHOOK_TOPIC = None + RECORD_TOPIC: Optional[str] = None LOG_STATE_FLAG = None TAG_NAMES = {"state"} @@ -302,7 +301,7 @@ async def save( reason: str = None, log_params: Mapping[str, Any] = None, log_override: bool = False, - webhook: bool = None, + event: bool = None, ) -> str: """Persist the record to storage. @@ -310,7 +309,7 @@ async def save( session: The profile session to use reason: A reason to add to the log log_params: Additional parameters to log - webhook: Flag to override whether the webhook is sent + event: Flag to override whether the event is sent """ new_record = None log_reason = reason or ("Updated record" if self._id else "Created record") @@ -336,7 +335,7 @@ async def save( log_reason, params, override=log_override, settings=session.settings ) - await self.post_save(session, new_record, self._last_state, webhook) + await self.post_save(session, new_record, self._last_state, event) self._last_state = self.state return self._id @@ -345,8 +344,8 @@ async def post_save( self, session: ProfileSession, new_record: bool, - last_state: str, - webhook: bool = None, + last_state: Optional[str], + event: bool = None, ): """Perform post-save actions. @@ -354,15 +353,12 @@ async def post_save( session: The profile session to use new_record: Flag indicating if the record was just created last_state: The previous state value - webhook: Adjust whether the webhook is called + event: Flag to override whether the event is sent """ - webhook_topic = self.webhook_topic - if webhook is None: - webhook = bool(webhook_topic) and (new_record or (last_state != self.state)) - if webhook: - await self.send_webhook( - session, self.webhook_payload, topic=self.webhook_topic - ) + if event is None: + event = new_record or (last_state != self.state) + if event: + await self.emit_event(session, self.serialize()) async def delete_record(self, session: ProfileSession): """Remove the stored record. @@ -375,35 +371,20 @@ async def delete_record(self, session: ProfileSession): await storage.delete_record(self.storage_record) # FIXME - update state and send webhook? - @property - def webhook_payload(self): - """Return a JSON-serialized version of the record for the webhook.""" - return self.serialize() - - @property - def webhook_topic(self): - """Return the webhook topic value.""" - return self.WEBHOOK_TOPIC - - async def send_webhook( - self, session: ProfileSession, payload: Any, topic: str = None - ): - """Send a standard webhook. + async def emit_event(self, session: ProfileSession, payload: Any): + """Emit an event. Args: session: The profile session to use - payload: The webhook payload - topic: The webhook topic, defaulting to WEBHOOK_TOPIC + payload: The event payload """ - if not payload: + + if not self.RECORD_TOPIC or not self.state or not payload: return - if not topic: - topic = self.webhook_topic - if not topic: - return - responder = session.inject(BaseResponder, required=False) - if responder: - await responder.send_webhook(topic, payload) + + await session.profile.notify( + f"acapy::record::{self.RECORD_TOPIC}::{self.state}", payload + ) @classmethod def log_state( diff --git a/aries_cloudagent/messaging/models/tests/test_base_record.py b/aries_cloudagent/messaging/models/tests/test_base_record.py index e79f405408..277df78133 100644 --- a/aries_cloudagent/messaging/models/tests/test_base_record.py +++ b/aries_cloudagent/messaging/models/tests/test_base_record.py @@ -4,10 +4,10 @@ from marshmallow import EXCLUDE, fields from ....cache.base import BaseCache +from ....core.event_bus import EventBus, MockEventBus, Event from ....core.in_memory import InMemoryProfile from ....storage.base import BaseStorage, StorageDuplicateError, StorageRecord -from ...responder import BaseResponder, MockResponder from ...util import time_now from ..base_record import BaseRecord, BaseRecordSchema @@ -86,7 +86,7 @@ async def test_post_save_new(self): with async_mock.patch.object( record, "post_save", async_mock.CoroutineMock() ) as post_save: - await record.save(session, reason="reason", webhook=True) + await record.save(session, reason="reason", event=True) post_save.assert_called_once_with(session, True, None, True) mock_storage.add_record.assert_called_once() @@ -102,7 +102,7 @@ async def test_post_save_exist(self): with async_mock.patch.object( record, "post_save", async_mock.CoroutineMock() ) as post_save: - await record.save(session, reason="reason", webhook=False) + await record.save(session, reason="reason", event=False) post_save.assert_called_once_with(session, False, last_state, False) mock_storage.update_record.assert_called_once() @@ -254,17 +254,22 @@ def test_skip_log(self, mock_print): record.log_state("state", settings=None) mock_print.assert_not_called() - async def test_webhook(self): + async def test_emit_event(self): session = InMemoryProfile.test_session() - mock_responder = MockResponder() - session.context.injector.bind_instance(BaseResponder, mock_responder) + mock_event_bus = MockEventBus() + session.profile.context.injector.bind_instance(EventBus, mock_event_bus) record = BaseRecordImpl() payload = {"test": "payload"} - topic = "topic" - await record.send_webhook(session, None, None) # cover short circuit - await record.send_webhook(session, "hello", None) # cover short circuit - await record.send_webhook(session, payload, topic=topic) - assert mock_responder.webhooks == [(topic, payload)] + await record.emit_event(session, None) # cover short circuit + await record.emit_event(session, payload) # cover short circuit + record.RECORD_TOPIC = "topic" + await record.emit_event(session, payload) # cover short circuit + assert mock_event_bus.events == [] + record.state = "test_state" + await record.emit_event(session, payload) + assert mock_event_bus.events == [ + (session.profile, Event("acapy::record::topic::test_state", payload)) + ] async def test_tag_prefix(self): tags = {"~x": "a", "y": "b"} diff --git a/aries_cloudagent/messaging/responder.py b/aries_cloudagent/messaging/responder.py index 9380120978..8043d63e66 100644 --- a/aries_cloudagent/messaging/responder.py +++ b/aries_cloudagent/messaging/responder.py @@ -42,6 +42,7 @@ async def create_outbound( reply_session_id: str = None, reply_thread_id: str = None, reply_to_verkey: str = None, + reply_from_verkey: str = None, target: ConnectionTarget = None, target_list: Sequence[ConnectionTarget] = None, to_session_only: bool = False, @@ -62,6 +63,7 @@ async def create_outbound( reply_session_id=reply_session_id, reply_thread_id=reply_thread_id, reply_to_verkey=reply_to_verkey, + reply_from_verkey=reply_from_verkey, target=target, target_list=target_list, to_session_only=to_session_only, @@ -114,7 +116,7 @@ async def send_outbound(self, message: OutboundMessage): @abstractmethod async def send_webhook(self, topic: str, payload: dict): """ - Dispatch a webhook. + Dispatch a webhook. DEPRECATED: use the event bus instead. Args: topic: the webhook topic identifier @@ -128,7 +130,6 @@ class MockResponder(BaseResponder): def __init__(self): """Initialize the mock responder.""" self.messages = [] - self.webhooks = [] async def send(self, message: Union[AgentMessage, str, bytes], **kwargs): """Convert a message to an OutboundMessage and send it.""" @@ -144,4 +145,6 @@ async def send_outbound(self, message: OutboundMessage): async def send_webhook(self, topic: str, payload: dict): """Send an outbound message.""" - self.webhooks.append((topic, payload)) + raise Exception( + "responder.send_webhook is deprecated; please use the event bus instead." + ) diff --git a/aries_cloudagent/protocols/actionmenu/v1_0/base_service.py b/aries_cloudagent/protocols/actionmenu/v1_0/base_service.py index 3d09f4ba6a..f5a6959f6f 100644 --- a/aries_cloudagent/protocols/actionmenu/v1_0/base_service.py +++ b/aries_cloudagent/protocols/actionmenu/v1_0/base_service.py @@ -4,6 +4,7 @@ from ....config.injection_context import InjectionContext from ....connections.models.conn_record import ConnRecord +from ....core.profile import Profile from ....messaging.agent_message import AgentMessage from .messages.menu import Menu @@ -28,12 +29,16 @@ async def get_instance(context: InjectionContext): @abstractmethod async def get_active_menu( - self, connection: ConnRecord = None, thread_id: str = None + self, + profile: Profile, + connection: ConnRecord = None, + thread_id: str = None, ) -> Menu: """ Render the current menu. Args: + profile: The profile connection: The active connection record thread_id: The thread identifier from the requesting message. """ @@ -41,6 +46,7 @@ async def get_active_menu( @abstractmethod async def perform_menu_action( self, + profile: Profile, action_name: str, action_params: dict, connection: ConnRecord = None, @@ -50,6 +56,7 @@ async def perform_menu_action( Perform an action defined by the active menu. Args: + profile: The profile action_name: The unique name of the action being performed action_params: A collection of parameters for the action connection: The active connection record diff --git a/aries_cloudagent/protocols/actionmenu/v1_0/driver_service.py b/aries_cloudagent/protocols/actionmenu/v1_0/driver_service.py index 043255f1fc..55ce6c6f2b 100644 --- a/aries_cloudagent/protocols/actionmenu/v1_0/driver_service.py +++ b/aries_cloudagent/protocols/actionmenu/v1_0/driver_service.py @@ -3,8 +3,8 @@ import logging from ....connections.models.conn_record import ConnRecord +from ....core.profile import Profile from ....messaging.agent_message import AgentMessage -from ....messaging.responder import BaseResponder from .base_service import BaseMenuService from .messages.menu import Menu @@ -16,17 +16,21 @@ class DriverMenuService(BaseMenuService): """Driver-based action menu service.""" async def get_active_menu( - self, connection: ConnRecord = None, thread_id: str = None + self, + profile: Profile, + connection: ConnRecord = None, + thread_id: str = None, ) -> Menu: """ Render the current menu. Args: + profile: The profile connection: The active connection record thread_id: The thread identifier from the requesting message. """ - await self.send_webhook( - "get-active-menu", + await profile.notify( + "acapy::actionmenu::get-active-menu", { "connection_id": connection and connection.connection_id, "thread_id": thread_id, @@ -36,6 +40,7 @@ async def get_active_menu( async def perform_menu_action( self, + profile: Profile, action_name: str, action_params: dict, connection: ConnRecord = None, @@ -45,13 +50,14 @@ async def perform_menu_action( Perform an action defined by the active menu. Args: + profile: The profile action_name: The unique name of the action being performed action_params: A collection of parameters for the action connection: The active connection record thread_id: The thread identifier from the requesting message. """ - await self.send_webhook( - "perform-menu-action", + await profile.notify( + "acapy::actionmenu::perform-menu-action", { "connection_id": connection and connection.connection_id, "thread_id": thread_id, @@ -60,9 +66,3 @@ async def perform_menu_action( }, ) return None - - async def send_webhook(self, topic: str, payload: dict): - """Dispatch a webhook through the registered responder.""" - responder = self._context.inject(BaseResponder, required=False) - if responder: - await responder.send_webhook(topic, payload) diff --git a/aries_cloudagent/protocols/actionmenu/v1_0/handlers/menu_request_handler.py b/aries_cloudagent/protocols/actionmenu/v1_0/handlers/menu_request_handler.py index d315704034..e25f9d007d 100644 --- a/aries_cloudagent/protocols/actionmenu/v1_0/handlers/menu_request_handler.py +++ b/aries_cloudagent/protocols/actionmenu/v1_0/handlers/menu_request_handler.py @@ -29,7 +29,9 @@ async def handle(self, context: RequestContext, responder: BaseResponder): service: BaseMenuService = context.inject(BaseMenuService, required=False) if service: menu = await service.get_active_menu( - context.connection_record, context.message._thread_id + context.profile, + context.connection_record, + context.message._thread_id, ) if menu: await responder.send_reply(menu) diff --git a/aries_cloudagent/protocols/actionmenu/v1_0/handlers/perform_handler.py b/aries_cloudagent/protocols/actionmenu/v1_0/handlers/perform_handler.py index 8f18318a7e..1f6747fb2d 100644 --- a/aries_cloudagent/protocols/actionmenu/v1_0/handlers/perform_handler.py +++ b/aries_cloudagent/protocols/actionmenu/v1_0/handlers/perform_handler.py @@ -29,6 +29,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder): service: BaseMenuService = context.inject(BaseMenuService, required=False) if service: reply = await service.perform_menu_action( + context.profile, context.message.name, context.message.params or {}, context.connection_record, diff --git a/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_service.py b/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_service.py index 8f46f1bd29..8a94c08361 100644 --- a/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_service.py +++ b/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_service.py @@ -1,9 +1,9 @@ from asynctest import TestCase as AsyncTestCase from asynctest import mock as async_mock +from .....core.event_bus import EventBus, MockEventBus from .....core.in_memory import InMemoryProfile from .....messaging.request_context import RequestContext -from .....messaging.responder import MockResponder from .. import driver_service as test_module @@ -14,8 +14,8 @@ async def setUp(self): self.context = RequestContext(self.session.profile) async def test_get_active_menu(self): - self.responder = MockResponder() - self.context.injector.bind_instance(test_module.BaseResponder, self.responder) + mock_event_bus = MockEventBus() + self.context.profile.context.injector.bind_instance(EventBus, mock_event_bus) self.menu_service = await ( test_module.DriverMenuService.service_handler()(self.context) @@ -25,20 +25,21 @@ async def test_get_active_menu(self): connection.connection_id = "connid" thread_id = "thid" - await self.menu_service.get_active_menu(connection, thread_id) + await self.menu_service.get_active_menu( + self.context.profile, connection, thread_id + ) - webhooks = self.responder.webhooks - assert len(webhooks) == 1 - (result, target) = webhooks[0] - assert result == "get-active-menu" - assert target == { + assert len(mock_event_bus.events) == 1 + (_, event) = mock_event_bus.events[0] + assert event.topic == "acapy::actionmenu::get-active-menu" + assert event.payload == { "connection_id": connection.connection_id, "thread_id": thread_id, } async def test_perform_menu_action(self): - self.responder = MockResponder() - self.context.injector.bind_instance(test_module.BaseResponder, self.responder) + mock_event_bus = MockEventBus() + self.context.profile.context.injector.bind_instance(EventBus, mock_event_bus) self.menu_service = await ( test_module.DriverMenuService.service_handler()(self.context) @@ -51,14 +52,17 @@ async def test_perform_menu_action(self): thread_id = "thid" await self.menu_service.perform_menu_action( - action_name, action_params, connection, thread_id + self.context.profile, + action_name, + action_params, + connection, + thread_id, ) - webhooks = self.responder.webhooks - assert len(webhooks) == 1 - (result, target) = webhooks[0] - assert result == "perform-menu-action" - assert target == { + assert len(mock_event_bus.events) == 1 + (_, event) = mock_event_bus.events[0] + assert event.topic == "acapy::actionmenu::perform-menu-action" + assert event.payload == { "connection_id": connection.connection_id, "thread_id": thread_id, "action_name": action_name, diff --git a/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_util.py b/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_util.py index 1afc566e03..370d88e621 100644 --- a/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_util.py +++ b/aries_cloudagent/protocols/actionmenu/v1_0/tests/test_util.py @@ -1,7 +1,7 @@ from asynctest import TestCase as AsyncTestCase +from .....core.event_bus import EventBus, MockEventBus from .....admin.request_context import AdminRequestContext -from .....messaging.responder import MockResponder from .. import util as test_module from ..models.menu_form_param import MenuFormParam @@ -13,8 +13,8 @@ class TestActionMenuUtil(AsyncTestCase): async def test_save_retrieve_delete_connection_menu(self): context = AdminRequestContext.test_context() - responder = MockResponder() - context.injector.bind_instance(test_module.BaseResponder, responder) + mock_event_bus = MockEventBus() + context.profile.context.injector.bind_instance(EventBus, mock_event_bus) menu = test_module.Menu( title="title", @@ -50,13 +50,12 @@ async def test_save_retrieve_delete_connection_menu(self): for i in range(2): # once to add, once to update await test_module.save_connection_menu(menu, connection_id, context) - webhooks = responder.webhooks - assert len(webhooks) == 1 - (result, target) = webhooks[0] - assert result == "actionmenu" - assert target["connection_id"] == connection_id - assert target["menu"] == menu.serialize() - responder.webhooks.clear() + assert len(mock_event_bus.events) == 1 + (_, event) = mock_event_bus.events[0] + assert event.topic == "acapy::actionmenu::received" + assert event.payload["connection_id"] == connection_id + assert event.payload["menu"] == menu.serialize() + mock_event_bus.events.clear() # retrieve connection menu assert ( @@ -66,12 +65,11 @@ async def test_save_retrieve_delete_connection_menu(self): # delete connection menu await test_module.save_connection_menu(None, connection_id, context) - webhooks = responder.webhooks - assert len(webhooks) == 1 - (result, target) = webhooks[0] - assert result == "actionmenu" - assert target == {"connection_id": connection_id, "menu": None} - responder.webhooks.clear() + assert len(mock_event_bus.events) == 1 + (_, event) = mock_event_bus.events[0] + assert event.topic == "acapy::actionmenu::received" + assert event.payload == {"connection_id": connection_id, "menu": None} + mock_event_bus.events.clear() # retrieve no menu assert ( diff --git a/aries_cloudagent/protocols/actionmenu/v1_0/util.py b/aries_cloudagent/protocols/actionmenu/v1_0/util.py index 41a40d6c65..fdec9510ee 100644 --- a/aries_cloudagent/protocols/actionmenu/v1_0/util.py +++ b/aries_cloudagent/protocols/actionmenu/v1_0/util.py @@ -1,7 +1,6 @@ """Action menu utility methods.""" from ....admin.request_context import AdminRequestContext -from ....messaging.responder import BaseResponder from ....storage.base import ( BaseStorage, StorageRecord, @@ -54,12 +53,10 @@ async def save_connection_menu( else: await storage.delete_record(record) - responder = context.inject(BaseResponder, required=False) - if responder: - await responder.send_webhook( - "actionmenu", - { - "connection_id": connection_id, - "menu": menu.serialize() if menu else None, - }, - ) + await context.profile.notify( + "acapy::actionmenu::received", + { + "connection_id": connection_id, + "menu": menu.serialize() if menu else None, + }, + ) diff --git a/aries_cloudagent/protocols/basicmessage/v1_0/handlers/basicmessage_handler.py b/aries_cloudagent/protocols/basicmessage/v1_0/handlers/basicmessage_handler.py index a44bb5aad7..6e9798dd7b 100644 --- a/aries_cloudagent/protocols/basicmessage/v1_0/handlers/basicmessage_handler.py +++ b/aries_cloudagent/protocols/basicmessage/v1_0/handlers/basicmessage_handler.py @@ -34,15 +34,18 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ): meta["copy_invite"] = True - await responder.send_webhook( - "basicmessages", - { - "connection_id": context.connection_record.connection_id, - "message_id": context.message._id, - "content": body, - "state": "received", - }, - ) + payload = { + "connection_id": context.connection_record.connection_id, + "message_id": context.message._id, + "content": body, + "state": "received", + "sent_time": context.message.sent_time, + } + + if "l10n" in context.message._decorators: + payload["locale"] = context.message._decorators["l10n"].locale + + await context.profile.notify("acapy::basicmessage::received", payload) reply = None if body: diff --git a/aries_cloudagent/protocols/basicmessage/v1_0/handlers/tests/test_basicmessage_handler.py b/aries_cloudagent/protocols/basicmessage/v1_0/handlers/tests/test_basicmessage_handler.py index 05c7405cb7..0f70218901 100644 --- a/aries_cloudagent/protocols/basicmessage/v1_0/handlers/tests/test_basicmessage_handler.py +++ b/aries_cloudagent/protocols/basicmessage/v1_0/handlers/tests/test_basicmessage_handler.py @@ -1,10 +1,11 @@ -import pytest from unittest import mock -from ......messaging.base_handler import HandlerException +import pytest + +from ......core.event_bus import EventBus, MockEventBus, Event +from ......messaging.decorators.localization_decorator import LocalizationDecorator from ......messaging.request_context import RequestContext from ......messaging.responder import MockResponder - from ...handlers.basicmessage_handler import BasicMessageHandler from ...messages.basicmessage import BasicMessage @@ -17,6 +18,8 @@ def request_context() -> RequestContext: class TestBasicMessageHandler: @pytest.mark.asyncio async def test_basic_message(self, request_context): + mock_event_bus = MockEventBus() + request_context.profile.context.injector.bind_instance(EventBus, mock_event_bus) request_context.connection_record = mock.MagicMock() test_message_content = "http://aries.ca/hello" request_context.message = BasicMessage(content=test_message_content) @@ -26,16 +29,19 @@ async def test_basic_message(self, request_context): await handler.handle(request_context, responder) messages = responder.messages assert len(messages) == 0 - hooks = responder.webhooks - assert len(hooks) == 1 - assert hooks[0] == ( - "basicmessages", - { - "connection_id": request_context.connection_record.connection_id, - "message_id": request_context.message._id, - "content": test_message_content, - "state": "received", - }, + assert len(mock_event_bus.events) == 1 + assert mock_event_bus.events[0] == ( + request_context.profile, + Event( + "acapy::basicmessage::received", + { + "connection_id": request_context.connection_record.connection_id, + "message_id": request_context.message._id, + "content": test_message_content, + "state": "received", + "sent_time": request_context.message.sent_time, + }, + ), ) @pytest.mark.asyncio @@ -63,7 +69,8 @@ async def test_basic_message_response_reply_with(self, request_context): request_context.default_label = "agent" test_message_content = "Reply with: g'day" request_context.message = BasicMessage( - content=test_message_content, localization="en-CA" + content=test_message_content, + localization=LocalizationDecorator(locale="en-CA"), ) request_context.connection_ready = True handler = BasicMessageHandler() diff --git a/aries_cloudagent/protocols/endorse_transaction/v1_0/models/transaction_record.py b/aries_cloudagent/protocols/endorse_transaction/v1_0/models/transaction_record.py index 3e1e1cc0b0..841071a551 100644 --- a/aries_cloudagent/protocols/endorse_transaction/v1_0/models/transaction_record.py +++ b/aries_cloudagent/protocols/endorse_transaction/v1_0/models/transaction_record.py @@ -25,7 +25,7 @@ class Meta: TAG_NAMES = {"state", "thread_id", "connection_id"} RECORD_TYPE = "transaction" STATE_INIT = "init" - WEBHOOK_TOPIC = "endorse_transaction" + RECORD_TOPIC = "endorse_transaction" SIGNATURE_REQUEST = "http://didcomm.org/sign-attachment/%VER/signature-request" diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py index 682f69238e..fc830bf534 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py @@ -22,7 +22,7 @@ class Meta: RECORD_TYPE = "credential_exchange_v10" RECORD_ID_NAME = "credential_exchange_id" - WEBHOOK_TOPIC = "issue_credential" + RECORD_TOPIC = "issue_credential" TAG_NAMES = {"~thread_id"} if unencrypted_tags else {"thread_id"} INITIATOR_SELF = "self" diff --git a/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py b/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py index 924479c60b..50656bf086 100644 --- a/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py +++ b/aries_cloudagent/protocols/issue_credential/v2_0/models/cred_ex_record.py @@ -27,7 +27,7 @@ class Meta: RECORD_TYPE = "cred_ex_v20" RECORD_ID_NAME = "cred_ex_id" - WEBHOOK_TOPIC = "issue_credential_v2_0" + RECORD_TOPIC = "issue_credential_v2_0" TAG_NAMES = {"~thread_id"} if UNENCRYPTED_TAGS else {"thread_id"} INITIATOR_SELF = "self" diff --git a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py index 7aa1c44288..02eb70c9ce 100644 --- a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py +++ b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/dif.py @@ -22,7 +22,7 @@ class Meta: RECORD_ID_NAME = "cred_ex_dif_id" RECORD_TYPE = "dif_cred_ex_v20" TAG_NAMES = {"~cred_ex_id"} if UNENCRYPTED_TAGS else {"cred_ex_id"} - WEBHOOK_TOPIC = "issue_credential_v2_0_dif" + RECORD_TOPIC = "issue_credential_v2_0_dif" def __init__( self, diff --git a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py index 87e79a9b3f..c252a9ca7b 100644 --- a/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py +++ b/aries_cloudagent/protocols/issue_credential/v2_0/models/detail/indy.py @@ -22,7 +22,7 @@ class Meta: RECORD_ID_NAME = "cred_ex_indy_id" RECORD_TYPE = "indy_cred_ex_v20" TAG_NAMES = {"~cred_ex_id"} if UNENCRYPTED_TAGS else {"cred_ex_id"} - WEBHOOK_TOPIC = "issue_credential_v2_0_indy" + RECORD_TOPIC = "issue_credential_v2_0_indy" def __init__( self, diff --git a/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py b/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py index 180d5df60d..eb466bd5c7 100644 --- a/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py +++ b/aries_cloudagent/protocols/out_of_band/v1_0/models/invitation.py @@ -18,7 +18,7 @@ class Meta: RECORD_TYPE = "oob_invitation" RECORD_ID_NAME = "invitation_id" - WEBHOOK_TOPIC = "oob_invitation" + RECORD_TOPIC = "oob_invitation" TAG_NAMES = {"invi_msg_id"} STATE_INITIAL = "initial" diff --git a/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py b/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py index 1437b713d0..a969bf4bfa 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py @@ -21,7 +21,7 @@ class Meta: RECORD_TYPE = "presentation_exchange_v10" RECORD_ID_NAME = "presentation_exchange_id" - WEBHOOK_TOPIC = "present_proof" + RECORD_TOPIC = "present_proof" TAG_NAMES = {"~thread_id"} if unencrypted_tags else {"thread_id"} INITIATOR_SELF = "self" diff --git a/aries_cloudagent/protocols/present_proof/v2_0/models/pres_exchange.py b/aries_cloudagent/protocols/present_proof/v2_0/models/pres_exchange.py index 13e0658035..fa1fcda706 100644 --- a/aries_cloudagent/protocols/present_proof/v2_0/models/pres_exchange.py +++ b/aries_cloudagent/protocols/present_proof/v2_0/models/pres_exchange.py @@ -26,7 +26,7 @@ class Meta: RECORD_TYPE = "pres_ex_v20" RECORD_ID_NAME = "pres_ex_id" - WEBHOOK_TOPIC = "present_proof_v2_0" + RECORD_TOPIC = "present_proof_v2_0" TAG_NAMES = {"~thread_id"} if unencrypted_tags else {"thread_id"} INITIATOR_SELF = "self" diff --git a/aries_cloudagent/protocols/problem_report/v1_0/handler.py b/aries_cloudagent/protocols/problem_report/v1_0/handler.py index a403bcbf28..66a470f7f6 100644 --- a/aries_cloudagent/protocols/problem_report/v1_0/handler.py +++ b/aries_cloudagent/protocols/problem_report/v1_0/handler.py @@ -30,4 +30,6 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context.message, ) - await responder.send_webhook("problem_report", context.message.serialize()) + await context.profile.notify( + "acapy::problem_report", context.message.serialize() + ) diff --git a/aries_cloudagent/protocols/problem_report/v1_0/tests/test_handler.py b/aries_cloudagent/protocols/problem_report/v1_0/tests/test_handler.py index ae3db794c8..b42138b9e8 100644 --- a/aries_cloudagent/protocols/problem_report/v1_0/tests/test_handler.py +++ b/aries_cloudagent/protocols/problem_report/v1_0/tests/test_handler.py @@ -1,5 +1,6 @@ import pytest +from .....core.event_bus import EventBus, MockEventBus, Event from .....messaging.base_handler import HandlerException from .....messaging.request_context import RequestContext from .....messaging.responder import MockResponder @@ -17,6 +18,9 @@ def request_context() -> RequestContext: class TestPingHandler: @pytest.mark.asyncio async def test_problem_report(self, request_context): + mock_event_bus = MockEventBus() + request_context.profile.context.injector.bind_instance(EventBus, mock_event_bus) + request_context.message_receipt = MessageReceipt() request_context.message = ProblemReport() request_context.connection_ready = True @@ -25,6 +29,8 @@ async def test_problem_report(self, request_context): await handler.handle(request_context, responder) messages = responder.messages assert len(messages) == 0 - hooks = responder.webhooks - assert len(hooks) == 1 - assert hooks[0] == ("problem_report", request_context.message.serialize()) + assert len(mock_event_bus.events) == 1 + (profile, event) = mock_event_bus.events[0] + assert profile == request_context.profile + assert event.topic == "acapy::problem_report" + assert event.payload == request_context.message.serialize() diff --git a/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_handler.py b/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_handler.py index 41dc43e63c..11817d59e4 100644 --- a/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_handler.py +++ b/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_handler.py @@ -43,8 +43,8 @@ async def handle(self, context: RequestContext, responder: BaseResponder): await responder.send_reply(reply) if context.settings.get("debug.monitor_ping"): - await responder.send_webhook( - "ping", + await context.profile.notify( + "acapy::ping::received", { "comment": context.message.comment, "connection_id": context.message_receipt.connection_id, diff --git a/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_response_handler.py b/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_response_handler.py index 69f8b3c0ae..18de38ad56 100644 --- a/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_response_handler.py +++ b/aries_cloudagent/protocols/trustping/v1_0/handlers/ping_response_handler.py @@ -30,8 +30,8 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) if context.settings.get("debug.monitor_ping"): - await responder.send_webhook( - "ping", + await context.profile.notify( + "acapy::ping::response_received", { "comment": context.message.comment, "connection_id": context.message_receipt.connection_id, diff --git a/aries_cloudagent/revocation/models/issuer_cred_rev_record.py b/aries_cloudagent/revocation/models/issuer_cred_rev_record.py index af43cfc491..cb87c070b9 100644 --- a/aries_cloudagent/revocation/models/issuer_cred_rev_record.py +++ b/aries_cloudagent/revocation/models/issuer_cred_rev_record.py @@ -24,7 +24,7 @@ class Meta: RECORD_TYPE = "issuer_cred_rev" RECORD_ID_NAME = "record_id" - WEBHOOK_TOPIC = "issuer_cred_rev" + RECORD_TOPIC = "issuer_cred_rev" TAG_NAMES = { "cred_ex_id", "cred_def_id", diff --git a/aries_cloudagent/revocation/models/issuer_rev_reg_record.py b/aries_cloudagent/revocation/models/issuer_rev_reg_record.py index 11be63d572..e3e0e5885c 100644 --- a/aries_cloudagent/revocation/models/issuer_rev_reg_record.py +++ b/aries_cloudagent/revocation/models/issuer_rev_reg_record.py @@ -47,7 +47,7 @@ class Meta: RECORD_ID_NAME = "record_id" RECORD_TYPE = "issuer_rev_reg" - WEBHOOK_TOPIC = "revocation_registry" + RECORD_TOPIC = "revocation_registry" LOG_STATE_FLAG = "debug.revocation" TAG_NAMES = { "cred_def_id", diff --git a/aries_cloudagent/transport/inbound/session.py b/aries_cloudagent/transport/inbound/session.py index c92af9c2ad..88df01be8e 100644 --- a/aries_cloudagent/transport/inbound/session.py +++ b/aries_cloudagent/transport/inbound/session.py @@ -177,7 +177,6 @@ async def handle_relay_context(self, payload_enc: Union[str, bytes]): responder = AdminResponder( profile, base_responder.send_fn, - base_responder.webhook_fn, ) profile.context.injector.bind_instance(BaseResponder, responder)