Skip to content

Commit

Permalink
feat: send webhooks for forward messages
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Glastra <timo@animo.id>
  • Loading branch information
TimoGlastra committed May 5, 2021
1 parent e847cd1 commit d521075
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 18 deletions.
13 changes: 13 additions & 0 deletions AdminAPI.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ When a webhook is dispatched, the record `topic` is appended as a path component
* `content`: the contents of the agent message
* `state`: `received`

#### Forward Message Received (`/forward`)

Enable using `--monitor-forward`.

* `connection_id`: the identifier of the connection associated with the recipient key
* `recipient_key`: the recipient key of the forward message (`to` field of the forward message)
* `status`: The delivery status of the received forward message. Possible values:
* `sent_to_session`: Message is sent directly to the connection over an active transport session
* `sent_to_external_queue`: Message is sent to external queue. No information is known on the delivery of the message
* `queued_for_delivery`: Message is queued for delivery using outbound transport (recipient connection has an endpoint)
* `waiting_for_pickup`: The connection has no reachable endpoint. Need to wait for the recipient to connect with return routing for delivery
* `undeliverable`: The connection has not reachable endpoint, and the internal queue for messages is not enabled (`--enable-undelivered-queue`).

#### Credential Exchange Record Updated (`/issue_credential`)

* `credential_exchange_id`: the unique identifier of the credential exchange
Expand Down
4 changes: 4 additions & 0 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ def sort_dict(raw: dict) -> dict:
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self._on_webhook_event)
event_bus.subscribe(EVENT_PATTERN_RECORD, self._on_record_event)

# Only include forward webhook events if the option is enabled
if self.context.settings.get_bool("monitor_forward", False):
EVENT_WEBHOOK_MAPPING["acapy::forward::received"] = "forward"

for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
event_bus.subscribe(
re.compile(re.escape(event_topic)),
Expand Down
8 changes: 8 additions & 0 deletions aries_cloudagent/config/argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ def add_arguments(self, parser: ArgumentParser):
env_var="ACAPY_MONITOR_PING",
help="Send a webhook when a ping is sent or received.",
)
parser.add_argument(
"--monitor-forward",
action="store_true",
env_var="ACAPY_MONITOR_FORWARD",
help="Send a webhook when a forward is received.",
)
parser.add_argument(
"--public-invites",
action="store_true",
Expand Down Expand Up @@ -851,6 +857,8 @@ def get_settings(self, args: Namespace) -> dict:
settings["invite_base_url"] = args.invite_base_url
if args.monitor_ping:
settings["debug.monitor_ping"] = args.monitor_ping
if args.monitor_forward:
settings["monitor_forward"] = args.monitor_forward
if args.public_invites:
settings["public_invites"] = True
if args.timing:
Expand Down
31 changes: 21 additions & 10 deletions aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,7 @@ async def outbound_message_router(
return OutboundSendStatus.SENT_TO_SESSION

if not outbound.to_session_only:
await self.queue_outbound(profile, outbound, inbound)
return OutboundSendStatus.QUEUED_FOR_DELIVERY
return await self.queue_outbound(profile, outbound, inbound)

def handle_not_returned(self, profile: Profile, outbound: OutboundMessage):
"""Handle a message that failed delivery via an inbound session."""
Expand All @@ -490,7 +489,7 @@ async def queue_outbound(
profile: Profile,
outbound: OutboundMessage,
inbound: InboundMessage = None,
):
) -> OutboundSendStatus:
"""
Queue an outbound message for transport.
Expand Down Expand Up @@ -526,15 +525,15 @@ async def queue_outbound(
# internal queue usually results in the message to be sent over
# ACA-py `-ot` transport.
if self.outbound_queue:
await self._queue_external(profile, outbound)
return await self._queue_external(profile, outbound)
else:
self._queue_internal(profile, outbound)
return self._queue_internal(profile, outbound)

async def _queue_external(
self,
profile: Profile,
outbound: OutboundMessage,
):
) -> OutboundSendStatus:
"""Save the message to an external outbound queue."""
async with self.outbound_queue:
targets = (
Expand All @@ -545,17 +544,29 @@ async def _queue_external(
outbound.payload, target.endpoint
)

def _queue_internal(self, profile: Profile, outbound: OutboundMessage):
return OutboundSendStatus.SENT_TO_EXTERNAL_QUEUE

def _queue_internal(
self, profile: Profile, outbound: OutboundMessage
) -> OutboundSendStatus:
"""Save the message to an internal outbound queue."""
try:
self.outbound_transport_manager.enqueue_message(profile, outbound)
return OutboundSendStatus.QUEUED_FOR_DELIVERY
except OutboundDeliveryError:
LOGGER.warning("Cannot queue message for delivery, no supported transport")
self.handle_not_delivered(profile, outbound)
return self.handle_not_delivered(profile, outbound)

def handle_not_delivered(self, profile: Profile, outbound: OutboundMessage):
def handle_not_delivered(
self, profile: Profile, outbound: OutboundMessage
) -> OutboundSendStatus:
"""Handle a message that failed delivery via outbound transports."""
self.inbound_transport_manager.return_undelivered(outbound)
queued_for_inbound = self.inbound_transport_manager.return_undelivered(outbound)

if queued_for_inbound:
return OutboundSendStatus.WAITING_FOR_PICKUP
else:
return OutboundSendStatus.UNDELIVERABLE

def webhook_router(
self,
Expand Down
13 changes: 10 additions & 3 deletions aries_cloudagent/messaging/responder.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,24 @@ def __init__(self):
"""Initialize the mock responder."""
self.messages = []

async def send(self, message: Union[AgentMessage, str, bytes], **kwargs):
async def send(
self, message: Union[AgentMessage, str, bytes], **kwargs
) -> OutboundSendStatus:
"""Convert a message to an OutboundMessage and send it."""
self.messages.append((message, kwargs))
return OutboundSendStatus.QUEUED_FOR_DELIVERY

async def send_reply(self, message: Union[AgentMessage, str, bytes], **kwargs):
async def send_reply(
self, message: Union[AgentMessage, str, bytes], **kwargs
) -> OutboundSendStatus:
"""Send a reply to an incoming message."""
self.messages.append((message, kwargs))
return OutboundSendStatus.QUEUED_FOR_DELIVERY

async def send_outbound(self, message: OutboundMessage):
async def send_outbound(self, message: OutboundMessage) -> OutboundSendStatus:
"""Send an outbound message."""
self.messages.append((message, None))
return OutboundSendStatus.QUEUED_FOR_DELIVERY

async def send_webhook(self, topic: str, payload: dict):
"""Send an outbound message."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,20 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
self._logger.info(
f"Forwarding message to connection: {recipient.connection_id}"
)
await responder.send(

send_status = await responder.send(
packed,
connection_id=recipient.connection_id,
target_list=connection_targets,
reply_to_verkey=connection_verkey,
)

# emit event that a forward message is received (may trigger webhook event)
await context.profile.notify(
"acapy::forward::received",
{
"connection_id": recipient.connection_id,
"status": send_status.value,
"recipient_key": context.message.to,
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ async def test_handle(self):
test_module, "RoutingManager", autospec=True
) as mock_mgr, async_mock.patch.object(
test_module, "ConnectionManager", autospec=True
) as mock_connection_mgr:
) as mock_connection_mgr, async_mock.patch.object(
self.context.profile, "notify", autospec=True
) as mock_notify:
mock_mgr.return_value.get_recipient = async_mock.CoroutineMock(
return_value=RouteRecord(connection_id="dummy")
)
Expand All @@ -49,6 +51,15 @@ async def test_handle(self):

await handler.handle(self.context, responder)

mock_notify.assert_called_once_with(
"acapy::forward::received",
{
"connection_id": "dummy",
"status": "queued_for_delivery",
"recipient_key": "sample-did",
},
)

messages = responder.messages
assert len(messages) == 1
(result, target) = messages[0]
Expand Down
19 changes: 16 additions & 3 deletions aries_cloudagent/transport/outbound/status.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
"""Enum representing captured send status of outbound messages."""

from enum import Enum, auto
from enum import Enum


class OutboundSendStatus(Enum):
"""Send status of outbound messages."""

SENT_TO_SESSION = auto()
QUEUED_FOR_DELIVERY = auto()
# Could directly send the message to the connection over active session
SENT_TO_SESSION = "sent_to_session"

# Message is sent to external queue. We don't know how it will process the queue
SENT_TO_EXTERNAL_QUEUE = "sent_to_external_queue"

# Message is queued for delivery using outbound transport (recipient has endpoint)
QUEUED_FOR_DELIVERY = "queued_for_delivery"

# No endpoint available.
# Need to wait for the recipient to connect with return routing for delivery
WAITING_FOR_PICKUP = "waiting_for_pickup"

# No endpoint available, and no internal queue for messages.
UNDELIVERABLE = "undeliverable"

0 comments on commit d521075

Please sign in to comment.