Skip to content

Commit

Permalink
Merge pull request #1530 from Indicio-tech/fix/mediation-ws-ping
Browse files Browse the repository at this point in the history
Enable WS Pings for WS Inbound Transport
  • Loading branch information
swcurran authored Dec 6, 2021
2 parents 5f8b75a + 9da7f23 commit fa749d2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
26 changes: 26 additions & 0 deletions aries_cloudagent/config/argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,28 @@ def add_arguments(self, parser: ArgumentParser):
"accumulated messages in message queue. Default value is 4."
),
)
parser.add_argument(
"--ws-heartbeat-interval",
default=3,
type=BoundedInt(min=1),
env_var="ACAPY_WS_HEARTBEAT_INTERVAL",
metavar="<interval>",
help=(
"When using Websocket Inbound Transport, send WS pings every "
"<interval> seconds."
),
)
parser.add_argument(
"--ws-timeout-interval",
default=15,
type=BoundedInt(min=1),
env_var="ACAPY_WS_TIMEOUT_INTERVAL",
metavar="<interval>",
help=(
"When using Websocket Inbound Transport, timeout the WS connection "
"after <interval> seconds without a heartbeat ping."
),
)

def get_settings(self, args: Namespace):
"""Extract transport settings."""
Expand Down Expand Up @@ -1250,6 +1272,10 @@ def get_settings(self, args: Namespace):
settings["transport.max_message_size"] = args.max_message_size
if args.max_outbound_retry:
settings["transport.max_outbound_retry"] = args.max_outbound_retry
if args.ws_heartbeat_interval:
settings["transport.ws.heartbeat_interval"] = args.ws_heartbeat_interval
if args.ws_timeout_interval:
settings["transport.ws.timeout_interval"] = args.ws_timeout_interval

return settings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ def setUp(self):
self.message_results = []
self.port = unused_port()
self.session = None
self.transport = WsTransport("0.0.0.0", self.port, self.create_session)
self.profile = InMemoryProfile.test_profile()
self.transport = WsTransport(
"0.0.0.0", self.port, self.create_session, root_profile=self.profile
)
self.transport.wire_format = JsonWireFormat()
self.result_event = None
super().setUp()
Expand Down
20 changes: 19 additions & 1 deletion aries_cloudagent/transport/inbound/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ..error import WireFormatParseError
from .base import BaseInboundTransport, InboundTransportSetupError


LOGGER = logging.getLogger(__name__)


Expand All @@ -29,6 +30,12 @@ def __init__(self, host: str, port: int, create_session, **kwargs) -> None:
self.host = host
self.port = port
self.site: web.BaseSite = None
self.heartbeat_interval: int = self.root_profile.settings.get_int(
"transport.ws.heartbeat_interval"
)
self.timout_interval: int = self.root_profile.settings.get_int(
"transport.ws.timout_interval"
)

# TODO: set scheme dynamically based on SSL settings (ws/wss)

Expand Down Expand Up @@ -81,7 +88,11 @@ async def inbound_message_handler(self, request):
"""

ws = web.WebSocketResponse()
ws = web.WebSocketResponse(
autoping=True,
heartbeat=self.heartbeat_interval,
receive_timeout=self.timout_interval,
)
await ws.prepare(request)
loop = asyncio.get_event_loop()

Expand Down Expand Up @@ -113,6 +124,13 @@ async def inbound_message_handler(self, request):
"Websocket connection closed with exception: %s",
ws.exception(),
)
else:
LOGGER.error(
"Unexpected Websocket message type received: %s: %s, %s",
msg.type,
msg.data,
msg.extra,
)
if not ws.closed:
inbound = loop.create_task(ws.receive())

Expand Down

0 comments on commit fa749d2

Please sign in to comment.