From 550ee32d166f625cff1d8cd22a628cc9f5c9f7a3 Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Wed, 6 Sep 2023 10:04:39 -0400 Subject: [PATCH 1/6] Multiple resource leaks fixed in proxy and dealer --- crossbar/router/dealer.py | 52 +++++--- crossbar/worker/proxy.py | 274 +++++++++++++++++++++++++------------- 2 files changed, 216 insertions(+), 110 deletions(-) diff --git a/crossbar/router/dealer.py b/crossbar/router/dealer.py index 083fab783..24fd99abe 100644 --- a/crossbar/router/dealer.py +++ b/crossbar/router/dealer.py @@ -40,6 +40,7 @@ class InvocationRequest(object): 'id', 'registration', 'caller', + 'caller_session_id', 'call', 'callee', 'forward_for', @@ -53,6 +54,7 @@ def __init__(self, id, registration, caller, call, callee, forward_for, authoriz self.id = id self.registration = registration self.caller = caller + self.caller_session_id = caller._session_id self.call = call self.callee = callee self.forward_for = forward_for @@ -185,6 +187,7 @@ def detach(self, session): is_rlink_session = (session._authrole == "rlink") if session in self._caller_to_invocations: + # this needs to update all four places where we track invocations similar to _remove_invoke_request outstanding = self._caller_to_invocations.get(session, []) for invoke in outstanding: # type: InvocationRequest if invoke.canceled: @@ -207,11 +210,22 @@ def detach(self, session): request=invoke.id, session=session._session_id, ) + + invokes = self._callee_to_invocations[callee] + invokes.remove(invoke) + if not invokes: + del self._callee_to_invocations[callee] + + del self._invocations[invoke.id] + del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)] + self._router.send(invoke.callee, message.Interrupt( request=invoke.id, mode=message.Cancel.KILLNOWAIT, )) + del self._caller_to_invocations[session] + if session in self._session_to_registrations: # send out Errors for any in-flight calls we have @@ -235,6 +249,17 @@ def detach(self, session): if invoke.caller._transport: invoke.caller._transport.send(reply) + invokes = self._caller_to_invocations[invoke.caller] + invokes.remove(invoke) + if not invokes: + del self._caller_to_invocations[invoke.caller] + + del self._invocations[invoke.id] + del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)] + + if outstanding: + del self._callee_to_invocations[session] + for registration in self._session_to_registrations[session]: was_registered, was_last_callee = self._registration_map.drop_observer(session, registration) @@ -1120,23 +1145,20 @@ def _remove_invoke_request(self, invocation_request): invocation_request.timeout_call.cancel() invocation_request.timeout_call = None - invokes = self._callee_to_invocations[invocation_request.callee] - invokes.remove(invocation_request) - if not invokes: - del self._callee_to_invocations[invocation_request.callee] - - invokes = self._caller_to_invocations[invocation_request.caller] - invokes.remove(invocation_request) - if not invokes: - del self._caller_to_invocations[invocation_request.caller] + # all four places should always be updated together + if invocation_request.id in self._invocations: + del self._invocations[invocation_request.id] + invokes = self._callee_to_invocations[invocation_request.callee] + invokes.remove(invocation_request) + if not invokes: + del self._callee_to_invocations[invocation_request.callee] - del self._invocations[invocation_request.id] + invokes = self._caller_to_invocations[invocation_request.caller] + invokes.remove(invocation_request) + if not invokes: + del self._caller_to_invocations[invocation_request.caller] - # the session_id will be None if the caller session has - # already vanished - caller_id = invocation_request.caller._session_id - if caller_id is not None: - del self._invocations_by_call[caller_id, invocation_request.call.request] + del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request] # noinspection PyUnusedLocal def processCancel(self, session, cancel): diff --git a/crossbar/worker/proxy.py b/crossbar/worker/proxy.py index 1dd9c2e36..476ccd9f6 100644 --- a/crossbar/worker/proxy.py +++ b/crossbar/worker/proxy.py @@ -189,8 +189,9 @@ def onClose(self, wasClean): :param wasClean: Indicates if the transport has been closed regularly. :type wasClean: bool """ - self.log.info('{func} proxy frontend session closed (wasClean={wasClean})', + self.log.info('{func} proxy frontend session {sessionId} closed (wasClean={wasClean})', func=hltype(self.onClose), + sessionId=hlid(self._session_id), wasClean=wasClean) # actually, at this point, the backend session should already be gone, but better check! @@ -324,6 +325,11 @@ def backend_connected(backend: ProxyBackendSession): # first, wait for the WAMP-level transport to connect before starting to join yield backend._on_connect + # while we were yielding, frontend session might have been closed (transport disconnected) + if self.transport is None: + backend.disconnect() + raise TransportLost("Proxy frontend session disconnected while connecting to backend") + # node private key key = _read_node_key(self._controller._cbdir, private=False) @@ -428,6 +434,16 @@ def backend_failed(fail): return result + def _send_if_transport_is_alive(self, msg): + # this is a debugging helper + # We received a message on the backend connection (auth, welcome) and need to send it to the client + # this is semantically the same as _forward, and it solve the same issues + # We don't send here, but we also do not cause an exception if the transport is gone + if self.transport: + self.transport.send(msg) + else: + self.log.debug('Trying to send a message to the client, but no frontend transport! [{msg}]', msg=msg) + def _forward(self, msg): # we received a message on the backend connection: forward to client over frontend connection if self.transport: @@ -609,7 +625,7 @@ def _process_Hello(self, msg): hello_result = yield as_future(self._pending_auth.hello, realm, details) except Exception as e: self.log.failure() - self.transport.send( + self._send_if_transport_is_alive( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message='Frontend connection accept failed ({})'.format(e))) return @@ -618,61 +634,94 @@ def _process_Hello(self, msg): authmethod=hlval(authmethod), hello_result=hello_result) - # if the frontend session is accepted right away (eg when doing "anonymous" authentication), process the - # frontend accept .. - if isinstance(hello_result, types.Accept): - try: - # get a backend session mapped to the incoming frontend session - session = yield self.frontend_accepted(hello_result) - except Exception as e: - self.log.failure() - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='Frontend connection accept failed ({})'.format(e))) - return + #check if client disconnected while we were yielding to authenticator + if not self.transport: + self.log.debug( + '{func} proxy frontend disconnected while processing hello in authenticator:' + ': session_id={session_id}, session={session}, details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + return + else: + # if the frontend session is accepted right away (eg when doing "anonymous" authentication), process the + # frontend accept .. + if isinstance(hello_result, types.Accept): + try: + # get a backend session mapped to the incoming frontend session + session = yield self.frontend_accepted(hello_result) + except Exception as e: + self.log.failure() + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='Frontend connection accept failed ({})'.format(e))) + return - def _on_backend_joined(session, details): - # we now got everything! the frontend is authenticated, and a backend session is associated. - msg = message.Welcome(self._session_id, - ProxyFrontendSession.ROLES, - realm=details.realm, - authid=details.authid, - authrole=details.authrole, - authmethod=hello_result.authmethod, - authprovider=hello_result.authprovider, - authextra=dict(details.authextra or {}, **self._custom_authextra)) - self._backend_session = session - self.transport.send(msg) - self.log.debug( - '{func} proxy frontend session WELCOME: session_id={session}, session={session}, ' - 'details="{details}"', - func=hltype(self._process_Hello), - session_id=hlid(self._session_id), - session=self, - details=details) + if not self.transport: + # we have not yet established a backend session, only authenticator session was used + self.log.debug('{func} proxy frontend disconnected while connecting backend session' + ': session_id={session_id}, session={session}, details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + self._controller.unmap_backend(self, session) + self._backend_session = None + else: + def _on_backend_joined(session, details): + # we now got everything! the frontend is authenticated, and a backend session is associated. + msg = message.Welcome(self._session_id, + ProxyFrontendSession.ROLES, + realm=details.realm, + authid=details.authid, + authrole=details.authrole, + authmethod=hello_result.authmethod, + authprovider=hello_result.authprovider, + authextra=dict(details.authextra or {}, **self._custom_authextra)) + if self.transport: + self._backend_session = session + self.transport.send(msg) + self.log.debug( + '{func} proxy frontend session WELCOME: session_id={session_id}, session={session}, ' + 'details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + else: + self.log.debug( + '{func} proxy frontend disconnected while joining backend session' + ': session_id={session_id}, session={session}, details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + self._controller.unmap_backend(self, session) + self._backend_session = None - session.on('join', _on_backend_joined) + session.on('join', _on_backend_joined) - # if the client is required to do an authentication message exchange, answer sending a CHALLENGE message - elif isinstance(hello_result, types.Challenge): - self.transport.send(message.Challenge(hello_result.method, extra=hello_result.extra)) + # if the client is required to do an authentication message exchange, answer sending a CHALLENGE message + elif isinstance(hello_result, types.Challenge): + self._send_if_transport_is_alive(message.Challenge(hello_result.method, extra=hello_result.extra)) - # if the client is denied right away, answer by sending an ABORT message - elif isinstance(hello_result, types.Deny): - self.transport.send(message.Abort(hello_result.reason, message=hello_result.message)) + # if the client is denied right away, answer by sending an ABORT message + elif isinstance(hello_result, types.Deny): + self._send_if_transport_is_alive(message.Abort(hello_result.reason, message=hello_result.message)) - else: - # should not arrive here: internal (logic) error - self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', - rtype=hltype(hello_result), - func=hltype(self._process_Hello)) - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='internal error: unexpected authenticator return type {}'.format( - type(hello_result)))) - return + else: + # should not arrive here: internal (logic) error + self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', + rtype=hltype(hello_result), + func=hltype(self._process_Hello)) + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='internal error: unexpected authenticator return type {}'.format( + type(hello_result)))) + return - self.transport.send(message.Abort(ApplicationError.NO_AUTH_METHOD, message='no suitable authmethod found')) + self._send_if_transport_is_alive(message.Abort(ApplicationError.NO_AUTH_METHOD, message='no suitable authmethod found')) @inlineCallbacks def _process_Authenticate(self, msg): @@ -689,55 +738,84 @@ def _process_Authenticate(self, msg): func=hltype(self._process_Authenticate), pending_auth=self._pending_auth, authresult=auth_result) - if isinstance(auth_result, types.Accept): - try: - session = yield self.frontend_accepted(auth_result) - except Exception as e: - self.log.failure() - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='Frontend connection accept failed ({})'.format(e))) - else: - def _on_backend_joined(session, details): - msg = message.Welcome(self._session_id, - ProxyFrontendSession.ROLES, - realm=details.realm, - authid=details.authid, - authrole=details.authrole, - authmethod=auth_result.authmethod, - authprovider=auth_result.authprovider, - authextra=dict(details.authextra or {}, **self._custom_authextra)) - self._backend_session = session - self.transport.send(msg) - self.log.debug( - '{func} proxy frontend session WELCOME: session_id={session_id}, ' - 'session={session}, msg={msg}', - func=hltype(self._process_Authenticate), - session_id=hlid(self._session_id), - session=self, - msg=msg) - - session.on('join', _on_backend_joined) - elif isinstance(auth_result, types.Deny): - self.transport.send(message.Abort(auth_result.reason, message=auth_result.message)) + # check if client disconnected while we were yielding to authenticator + if not self.transport: + # we have not yet established a backend session, only authenticator session was used + self.log.info('{func} frontend disconnected while processing pending' + ' authentication {pending_auth}: {authresult}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth, + authresult=auth_result) else: - # should not arrive here: logic error - self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', - rtype=hltype(auth_result), - func=hltype(self._process_Authenticate)) - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='internal error: unexpected authenticator return type {}'.format( - type(auth_result)))) + if isinstance(auth_result, types.Accept): + try: + session = yield self.frontend_accepted(auth_result) + except TransportLost: + self.log.info('{func} frontend disconnected while connecting backend session {pending_auth}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth) + except Exception as e: + self.log.failure() + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='Frontend connection accept failed ({})'.format(e))) + else: + if self.transport is None: + # we have not yet established a backend session, only authenticator session was used + self.log.info('{func} frontend disconnected connecting backend session {pending_auth}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth) + self._controller.unmap_backend(self, session) + self._backend_session = None + else: + def _on_backend_joined(session, details): + msg = message.Welcome(self._session_id, + ProxyFrontendSession.ROLES, + realm=details.realm, + authid=details.authid, + authrole=details.authrole, + authmethod=auth_result.authmethod, + authprovider=auth_result.authprovider, + authextra=dict(details.authextra or {}, **self._custom_authextra)) + if self.transport: + self._backend_session = session + self.transport.send(msg) + self.log.debug( + '{func} proxy frontend session WELCOME: session_id={session_id}, ' + 'session={session}, msg={msg}', + func=hltype(self._process_Authenticate), + session_id=hlid(self._session_id), + session=self, + msg=msg) + else: + self.log.info( + '{func} frontend disconnected while joining backend session {pending_auth}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth) + self._controller.unmap_backend(self, session) + self._backend_session = None + + session.on('join', _on_backend_joined) + elif isinstance(auth_result, types.Deny): + self._send_if_transport_is_alive(message.Abort(auth_result.reason, message=auth_result.message)) + else: + # should not arrive here: logic error + self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', + rtype=hltype(auth_result), + func=hltype(self._process_Authenticate)) + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='internal error: unexpected authenticator return type {}'.format( + type(auth_result)))) else: # should not arrive here: logic error - self.transport.send( + self._send_if_transport_is_alive( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message='internal error: unexpected pending authentication')) else: # should not arrive here: client misbehaving! - self.transport.send( + self._send_if_transport_is_alive( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message='no pending authentication')) @@ -1428,7 +1506,6 @@ def stop(self): topic = '{}.on_proxy_connection_stopped'.format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) - class ProxyController(TransportController): """ Controller for proxy workers. Manages: @@ -1498,6 +1575,7 @@ def __init__(self, config=None, reactor=None, personality=None): # map: (realm_name, role_name) -> ProxyRoute self._service_sessions = {} + def has_realm(self, realm: str) -> bool: """ Check if a route to a realm with the given name is currently running. @@ -1592,12 +1670,18 @@ def get_service_session(self, realm: str, authrole: str) -> ApplicationSession: backend_config = self.get_backend_config(realm, authrole) # create and store a new service session connected to the backend router worker - self._service_sessions[realm][authrole] = yield make_service_session( - self._reactor, self, backend_config, realm, authrole) + self._service_sessions[realm][authrole] = make_service_session(self._reactor, self, backend_config, realm, authrole) else: # mark as non-existing! self._service_sessions[realm][authrole] = None + if self._service_sessions[realm] and self._service_sessions[realm][authrole]: + service_session_or_deferred = self._service_sessions[realm][authrole] + if isinstance(service_session_or_deferred, Deferred): + service_session = yield service_session_or_deferred + if service_session is not None: + self._service_sessions[realm][authrole] = service_session + # return cached service session if self._service_sessions[realm] and self._service_sessions[realm][authrole]: service_session = self._service_sessions[realm][authrole] From c4d6e4d7ab855fb7f7a44ae43e2d8a5566940e76 Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Wed, 6 Sep 2023 11:29:59 -0400 Subject: [PATCH 2/6] Cancel invocation timeout timers on caller/callee disconnect (cherry picked from commit 9da5aba37b446cd23a8fc798f1f6e4be0d43b5bb) --- crossbar/router/dealer.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crossbar/router/dealer.py b/crossbar/router/dealer.py index 24fd99abe..587d21ca8 100644 --- a/crossbar/router/dealer.py +++ b/crossbar/router/dealer.py @@ -211,6 +211,10 @@ def detach(self, session): session=session._session_id, ) + if invoke.timeout_call: + invoke.timeout_call.cancel() + invoke.timeout_call = None + invokes = self._callee_to_invocations[callee] invokes.remove(invoke) if not invokes: @@ -249,6 +253,10 @@ def detach(self, session): if invoke.caller._transport: invoke.caller._transport.send(reply) + if invoke.timeout_call: + invoke.timeout_call.cancel() + invoke.timeout_call = None + invokes = self._caller_to_invocations[invoke.caller] invokes.remove(invoke) if not invokes: From 589a81c4a3437526d7ef7cbc87ae8330bdffd627 Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Thu, 7 Sep 2023 14:49:47 -0400 Subject: [PATCH 3/6] Allow workers to use specified reactor --- crossbar/worker/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crossbar/worker/main.py b/crossbar/worker/main.py index 83427b37b..f4b80cd5c 100644 --- a/crossbar/worker/main.py +++ b/crossbar/worker/main.py @@ -123,7 +123,7 @@ def _run_command_exec_worker(options, reactor=None, personality=None): # we use an Autobahn utility to import the "best" available Twisted reactor from autobahn.twisted.choosereactor import install_reactor - reactor = install_reactor(options.reactor) + reactor = install_reactor(explicit_reactor=options.reactor or os.environ.get('CROSSBAR_REACTOR', None)) # make sure logging to something else than stdio is setup _first_ from crossbar._logging import make_JSON_observer, cb_logging_aware From 137d46acec90a5a2c9946a59312bc9e66ecc97a0 Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Tue, 2 Jan 2024 12:16:30 -0500 Subject: [PATCH 4/6] Pin eth-utils package --- crossbar/__init__.py | 4 ++++ requirements-latest.txt | 1 + requirements-pinned.txt | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/crossbar/__init__.py b/crossbar/__init__.py index 0ed50ac37..bfe9db6cc 100644 --- a/crossbar/__init__.py +++ b/crossbar/__init__.py @@ -22,6 +22,10 @@ if not hasattr(eth_abi, 'encode_single') and hasattr(eth_abi, 'encode'): eth_abi.encode_single = eth_abi.encode +import eth_typing +if not hasattr(eth_typing, 'ChainID') and hasattr(eth_typing, 'ChainId'): + eth_typing.ChainID = eth_typing.ChainId + # monkey patch web3 for master branch / upcoming v6 (which we need for python 3.11) # AttributeError: type object 'Web3' has no attribute 'toChecksumAddress'. Did you mean: 'to_checksum_address'? import web3 diff --git a/requirements-latest.txt b/requirements-latest.txt index 395518aca..266c92ec8 100644 --- a/requirements-latest.txt +++ b/requirements-latest.txt @@ -16,6 +16,7 @@ eth-abi @ git+https://github.com/ethereum/eth-abi.git@v4.0.0-beta.2#egg=eth-abi # required for python 3.11+ (https://github.com/ethereum/eth-account/pull/212) eth-account @ git+https://github.com/crossbario/eth-account.git@fix-215#egg=eth-account eth-typing @ git+https://github.com/ethereum/eth-typing.git@v3.2.0#egg=eth-typing +eth-utils==2.2.1 flask>=2.2.2 py-cid @ git+https://github.com/crossbario/py-cid.git@remove-dep-upper-limits#egg=py-cid py-multihash @ git+https://github.com/crossbario/py-multihash.git@remove-dep-version-limits#egg=py-multihash diff --git a/requirements-pinned.txt b/requirements-pinned.txt index de8e40a40..3892517aa 100644 --- a/requirements-pinned.txt +++ b/requirements-pinned.txt @@ -35,7 +35,7 @@ eth-keyfile==0.6.0 eth-keys==0.4.0 eth-rlp==0.3.0 eth-typing @ git+https://github.com/ethereum/eth-typing.git@c93c57f7331c8e515cf72e5893bf2c9cec7e5c69 -eth-utils==2.1.0 +eth-utils==2.2.1 Flask==2.2.2 flatbuffers==23.1.21 frozenlist==1.3.3 From 15ee8cd8fa3a3465270dd2f1a02ef423be207738 Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Thu, 1 Feb 2024 20:46:50 -0500 Subject: [PATCH 5/6] Test fixes attempt #1 --- crossbar/router/dealer.py | 2 +- crossbar/router/session.py | 3 ++- crossbar/router/test/test_dealer.py | 7 +++++++ crossbar/worker/proxy.py | 20 ++++++++++---------- mypy.ini | 2 +- requirements-dev.txt | 2 +- requirements-latest.txt | 2 +- tox.ini | 4 ++-- 8 files changed, 25 insertions(+), 17 deletions(-) diff --git a/crossbar/router/dealer.py b/crossbar/router/dealer.py index 587d21ca8..a4adb81cb 100644 --- a/crossbar/router/dealer.py +++ b/crossbar/router/dealer.py @@ -214,7 +214,7 @@ def detach(self, session): if invoke.timeout_call: invoke.timeout_call.cancel() invoke.timeout_call = None - + invokes = self._callee_to_invocations[callee] invokes.remove(invoke) if not invokes: diff --git a/crossbar/router/session.py b/crossbar/router/session.py index 964fe7da2..52267f646 100644 --- a/crossbar/router/session.py +++ b/crossbar/router/session.py @@ -736,12 +736,13 @@ def onClose(self, wasClean): self.onLeave(CloseDetails()) except Exception: self.log.failure("Exception raised in onLeave callback") + self.log.warn("{tb}".format(tb=Failure().getTraceback())) try: self._router.detach(self) except Exception as e: self.log.error("Failed to detach session '{}': {}".format(self._session_id, e)) - self.log.debug("{tb}".format(tb=Failure().getTraceback())) + self.log.warn("{tb}".format(tb=Failure().getTraceback())) self._session_id = None diff --git a/crossbar/router/test/test_dealer.py b/crossbar/router/test/test_dealer.py index 0dafe76e5..df2a9f4c2 100644 --- a/crossbar/router/test/test_dealer.py +++ b/crossbar/router/test/test_dealer.py @@ -111,10 +111,17 @@ def test_outstanding_invoke_but_caller_gone(self): outstanding = mock.Mock() outstanding.call.request = 1 + # there was a bug where timeout calls were not getting cancelled + # mock has non-null timeout_call, so we need to set it to None + outstanding.timeout_call = None dealer = self.router._dealer dealer.attach(session) + # All four maps involved in invocation tracking must be updated atomically + dealer._caller_to_invocations[outstanding.caller] = [outstanding] dealer._callee_to_invocations[session] = [outstanding] + dealer._invocations[outstanding.id] = outstanding + dealer._invocations_by_call[(outstanding.caller_session_id, outstanding.call.request)] = outstanding # pretend we've disconnected already outstanding.caller._transport = None diff --git a/crossbar/worker/proxy.py b/crossbar/worker/proxy.py index 476ccd9f6..cd83edf2e 100644 --- a/crossbar/worker/proxy.py +++ b/crossbar/worker/proxy.py @@ -634,7 +634,7 @@ def _process_Hello(self, msg): authmethod=hlval(authmethod), hello_result=hello_result) - #check if client disconnected while we were yielding to authenticator + # check if client disconnected while we were yielding to authenticator if not self.transport: self.log.debug( '{func} proxy frontend disconnected while processing hello in authenticator:' @@ -661,11 +661,11 @@ def _process_Hello(self, msg): if not self.transport: # we have not yet established a backend session, only authenticator session was used self.log.debug('{func} proxy frontend disconnected while connecting backend session' - ': session_id={session_id}, session={session}, details="{details}"', - func=hltype(self._process_Hello), - session_id=hlid(self._session_id), - session=self, - details=details) + ': session_id={session_id}, session={session}, details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) self._controller.unmap_backend(self, session) self._backend_session = None else: @@ -744,9 +744,9 @@ def _process_Authenticate(self, msg): # we have not yet established a backend session, only authenticator session was used self.log.info('{func} frontend disconnected while processing pending' ' authentication {pending_auth}: {authresult}', - func=hltype(self._process_Authenticate), - pending_auth=self._pending_auth, - authresult=auth_result) + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth, + authresult=auth_result) else: if isinstance(auth_result, types.Accept): try: @@ -1506,6 +1506,7 @@ def stop(self): topic = '{}.on_proxy_connection_stopped'.format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) + class ProxyController(TransportController): """ Controller for proxy workers. Manages: @@ -1575,7 +1576,6 @@ def __init__(self, config=None, reactor=None, personality=None): # map: (realm_name, role_name) -> ProxyRoute self._service_sessions = {} - def has_realm(self, realm: str) -> bool: """ Check if a route to a realm with the given name is currently running. diff --git a/mypy.ini b/mypy.ini index d2ae62f24..f1020ef82 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,5 +1,5 @@ [mypy] -python_version = 3.7 +python_version = 3.11 disable_error_code = annotation-unchecked [mypy-crossbar.worker.test.examples.*] diff --git a/requirements-dev.txt b/requirements-dev.txt index cc5bba177..4d1fc0c97 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -58,7 +58,7 @@ wheel twine hashin -pyinstaller==4.2 +pyinstaller>=4.2 # FIXME: the docker shit insists on old deps (yaml, jsonschema) # https://github.com/docker/compose/blob/30fcb72cf3b136598883752edfa6ea4f3b8643d4/setup.py#L27 diff --git a/requirements-latest.txt b/requirements-latest.txt index 266c92ec8..7ceb4cf7e 100644 --- a/requirements-latest.txt +++ b/requirements-latest.txt @@ -12,7 +12,7 @@ cookiecutter>=2.1.1 cryptography>=39.0.0 docker>=6.0.1 # required for python 3.11+ https://github.com/ethereum/eth-abi/pull/194 -eth-abi @ git+https://github.com/ethereum/eth-abi.git@v4.0.0-beta.2#egg=eth-abi +eth-abi==4.2.1 # required for python 3.11+ (https://github.com/ethereum/eth-account/pull/212) eth-account @ git+https://github.com/crossbario/eth-account.git@fix-215#egg=eth-account eth-typing @ git+https://github.com/ethereum/eth-typing.git@v3.2.0#egg=eth-typing diff --git a/tox.ini b/tox.ini index 2868b6490..5cb2c0042 100644 --- a/tox.ini +++ b/tox.ini @@ -162,7 +162,7 @@ exclude = crossbar/worker/test/examples/syntaxerror.py deps = bandit commands = - bandit -r -s B101,B110,B311 \ + bandit -r -s B101,B110,B311,B113 \ -x crossbar/common/key.py,crossbar/bridge/mqtt/test/test_wamp.py,crossbar/bridge/rest/test/__init__.py,crossbar/bridge/mqtt/test/test_wamp.py,crossbar/webservice/misc.py \ crossbar @@ -176,7 +176,7 @@ deps = commands = flake8 \ --exclude crossbar/shell/reflection \ - --ignore=E501,E402,E722,E741,W503,W504,E126,E251 \ + --ignore=E501,E402,E722,E741,W503,W504,E126,E251,E721 \ crossbar From 6debfe13e5e39840046f691a1ff271628104784f Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Tue, 7 May 2024 16:19:49 -0400 Subject: [PATCH 6/6] Allow proxy tests to run on platform without shared socket support --- test/functests/cbtests/test_cb_proxy.py | 60 ++++++++++++++++++++----- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/test/functests/cbtests/test_cb_proxy.py b/test/functests/cbtests/test_cb_proxy.py index a1da50106..f6c8bc28d 100644 --- a/test/functests/cbtests/test_cb_proxy.py +++ b/test/functests/cbtests/test_cb_proxy.py @@ -9,6 +9,12 @@ import os import re +import socket +import sys +import platform + +from twisted.internet import fdesc, tcp, ssl +from twisted.python.runtime import platformType from os.path import join from functools import partial @@ -26,6 +32,39 @@ # twice. from ..helpers import _cleanup_crossbar, start_crossbar, functest_session +# Allow to test on macOS and other platforms without shared ports + +_HAS_SHARED_LOADBALANCED_SOCKET = False + +if sys.platform.startswith('linux'): + try: + # get Linux kernel version, like: (3, 19) + _LINUX_KERNEL_VERSION = [int(x) for x in tuple(sys.platform.uname()[2].split('.')[:2])] + + # SO_REUSEPORT only supported for Linux kernels >= 3.9 + if (_LINUX_KERNEL_VERSION[0] == 3 and _LINUX_KERNEL_VERSION[1] >= 9) or _LINUX_KERNEL_VERSION[0] >= 4: + _HAS_SHARED_LOADBALANCED_SOCKET = True + + # monkey patch missing constant if needed + if not hasattr(socket, 'SO_REUSEPORT'): + socket.SO_REUSEPORT = 15 + except: + pass + +elif sys.platform == 'win32': + # http://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t/14388707#14388707 + _HAS_SHARED_LOADBALANCED_SOCKET = True + + +def shared_port(): + return _HAS_SHARED_LOADBALANCED_SOCKET + + +def proxy_port(proxy_ordinal): + if _HAS_SHARED_LOADBALANCED_SOCKET: + return 8443 + else: + return 8443 + proxy_ordinal @inlineCallbacks @@ -143,8 +182,8 @@ def test_proxy(request, virtualenv, reactor, session_temp): "id": "ws_test_0", "endpoint": { "type": "tcp", - "port": 8443, - "shared": True, + "port": proxy_port(0), + "shared": shared_port(), }, "paths": { "autobahn": { @@ -236,8 +275,8 @@ def test_proxy(request, virtualenv, reactor, session_temp): "type": "web", "endpoint": { "type": "tcp", - "port": 8443, - "shared": True, + "port": proxy_port(1), + "shared": shared_port(), }, "paths": { "autobahn": { @@ -303,6 +342,7 @@ class WaitForTransportAndProxy(object): Super hacky, but ... other suggestions? Could busy-wait for ports to become connect()-able? Better text to search for? """ + def __init__(self, done): self.data = '' self.done = done @@ -332,11 +372,11 @@ def write(self, data): listening = Deferred() protocol = yield start_crossbar( - reactor, virtualenv, - cbdir, crossbar_config, - stdout=WaitForTransportAndProxy(listening), - stderr=WaitForTransportAndProxy(listening), - log_level='debug' if request.config.getoption('logdebug', False) else False, + reactor, virtualenv, + cbdir, crossbar_config, + stdout=WaitForTransportAndProxy(listening), + stderr=WaitForTransportAndProxy(listening), + log_level='debug' if request.config.getoption('logdebug', False) else False, ) request.addfinalizer(partial(_cleanup_crossbar, protocol)) @@ -386,6 +426,7 @@ def call_test(*args, **kw): @callee.on_ready def _(session): callee_ready.callback(None) + callee.start() yield callee_ready @@ -394,7 +435,6 @@ def _(session): caller_sessions = [] results = [] for _ in range(num_callees): - @inlineCallbacks def main(reactor, session): # print("main: {} {}".format(reactor, session))