Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple resource leaks fixed in proxy and dealer #2098

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crossbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
if not hasattr(eth_abi, 'encode_single') and hasattr(eth_abi, 'encode'):
eth_abi.encode_single = eth_abi.encode

import eth_typing
if not hasattr(eth_typing, 'ChainID') and hasattr(eth_typing, 'ChainId'):
eth_typing.ChainID = eth_typing.ChainId

# monkey patch web3 for master branch / upcoming v6 (which we need for python 3.11)
# AttributeError: type object 'Web3' has no attribute 'toChecksumAddress'. Did you mean: 'to_checksum_address'?
import web3
Expand Down
60 changes: 45 additions & 15 deletions crossbar/router/dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class InvocationRequest(object):
'id',
'registration',
'caller',
'caller_session_id',
'call',
'callee',
'forward_for',
Expand All @@ -53,6 +54,7 @@ def __init__(self, id, registration, caller, call, callee, forward_for, authoriz
self.id = id
self.registration = registration
self.caller = caller
self.caller_session_id = caller._session_id
self.call = call
self.callee = callee
self.forward_for = forward_for
Expand Down Expand Up @@ -185,6 +187,7 @@ def detach(self, session):
is_rlink_session = (session._authrole == "rlink")
if session in self._caller_to_invocations:

# this needs to update all four places where we track invocations similar to _remove_invoke_request
outstanding = self._caller_to_invocations.get(session, [])
for invoke in outstanding: # type: InvocationRequest
if invoke.canceled:
Expand All @@ -207,11 +210,26 @@ def detach(self, session):
request=invoke.id,
session=session._session_id,
)

if invoke.timeout_call:
invoke.timeout_call.cancel()
invoke.timeout_call = None

invokes = self._callee_to_invocations[callee]
invokes.remove(invoke)
if not invokes:
del self._callee_to_invocations[callee]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

self._router.send(invoke.callee, message.Interrupt(
request=invoke.id,
mode=message.Cancel.KILLNOWAIT,
))

del self._caller_to_invocations[session]

if session in self._session_to_registrations:

# send out Errors for any in-flight calls we have
Expand All @@ -235,6 +253,21 @@ def detach(self, session):
if invoke.caller._transport:
invoke.caller._transport.send(reply)

if invoke.timeout_call:
invoke.timeout_call.cancel()
invoke.timeout_call = None

invokes = self._caller_to_invocations[invoke.caller]
invokes.remove(invoke)
if not invokes:
del self._caller_to_invocations[invoke.caller]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

if outstanding:
del self._callee_to_invocations[session]

for registration in self._session_to_registrations[session]:
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)

Expand Down Expand Up @@ -1120,23 +1153,20 @@ def _remove_invoke_request(self, invocation_request):
invocation_request.timeout_call.cancel()
invocation_request.timeout_call = None

invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]
# all four places should always be updated together
if invocation_request.id in self._invocations:
del self._invocations[invocation_request.id]
invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

del self._invocations[invocation_request.id]
invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]

# the session_id will be None if the caller session has
# already vanished
caller_id = invocation_request.caller._session_id
if caller_id is not None:
del self._invocations_by_call[caller_id, invocation_request.call.request]
del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request]

# noinspection PyUnusedLocal
def processCancel(self, session, cancel):
Expand Down
3 changes: 2 additions & 1 deletion crossbar/router/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,12 +736,13 @@ def onClose(self, wasClean):
self.onLeave(CloseDetails())
except Exception:
self.log.failure("Exception raised in onLeave callback")
self.log.warn("{tb}".format(tb=Failure().getTraceback()))

try:
self._router.detach(self)
except Exception as e:
self.log.error("Failed to detach session '{}': {}".format(self._session_id, e))
self.log.debug("{tb}".format(tb=Failure().getTraceback()))
self.log.warn("{tb}".format(tb=Failure().getTraceback()))

self._session_id = None

Expand Down
7 changes: 7 additions & 0 deletions crossbar/router/test/test_dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,17 @@ def test_outstanding_invoke_but_caller_gone(self):
outstanding = mock.Mock()
outstanding.call.request = 1

# there was a bug where timeout calls were not getting cancelled
# mock has non-null timeout_call, so we need to set it to None
outstanding.timeout_call = None
dealer = self.router._dealer
dealer.attach(session)

# All four maps involved in invocation tracking must be updated atomically
dealer._caller_to_invocations[outstanding.caller] = [outstanding]
dealer._callee_to_invocations[session] = [outstanding]
dealer._invocations[outstanding.id] = outstanding
dealer._invocations_by_call[(outstanding.caller_session_id, outstanding.call.request)] = outstanding
# pretend we've disconnected already
outstanding.caller._transport = None

Expand Down
2 changes: 1 addition & 1 deletion crossbar/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _run_command_exec_worker(options, reactor=None, personality=None):

# we use an Autobahn utility to import the "best" available Twisted reactor
from autobahn.twisted.choosereactor import install_reactor
reactor = install_reactor(options.reactor)
reactor = install_reactor(explicit_reactor=options.reactor or os.environ.get('CROSSBAR_REACTOR', None))

# make sure logging to something else than stdio is setup _first_
from crossbar._logging import make_JSON_observer, cb_logging_aware
Expand Down
Loading
Loading