Skip to content

Commit

Permalink
Multiple resource leaks fixed in dealer and proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
DZabavchik committed Sep 6, 2023
1 parent 2b8ae44 commit 8515d8e
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 106 deletions.
46 changes: 35 additions & 11 deletions crossbar/router/dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def detach(self, session):
is_rlink_session = (session._authrole == "rlink")
if session in self._caller_to_invocations:

# this needs to update all four places where we track invocations similar to _remove_invoke_request
outstanding = self._caller_to_invocations.get(session, [])
for invoke in outstanding: # type: InvocationRequest
if invoke.canceled:
Expand All @@ -209,11 +210,22 @@ def detach(self, session):
request=invoke.id,
session=session._session_id,
)

invokes = self._callee_to_invocations[callee]
invokes.remove(invoke)
if not invokes:
del self._callee_to_invocations[callee]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

self._router.send(invoke.callee, message.Interrupt(
request=invoke.id,
mode=message.Cancel.KILLNOWAIT,
))

del self._caller_to_invocations[session]

if session in self._session_to_registrations:

# send out Errors for any in-flight calls we have
Expand All @@ -237,6 +249,17 @@ def detach(self, session):
if invoke.caller._transport:
invoke.caller._transport.send(reply)

invokes = self._caller_to_invocations[invoke.caller]
invokes.remove(invoke)
if not invokes:
del self._caller_to_invocations[invoke.caller]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

if outstanding:
del self._callee_to_invocations[session]

for registration in self._session_to_registrations[session]:
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)

Expand Down Expand Up @@ -1145,19 +1168,20 @@ def _remove_invoke_request(self, invocation_request):
invocation_request.timeout_call.cancel()
invocation_request.timeout_call = None

invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]
# all four places should always be updated together
if invocation_request.id in self._invocations:
del self._invocations[invocation_request.id]
invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

del self._invocations[invocation_request.id]
invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]

del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request]
del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request]

# noinspection PyUnusedLocal
def processCancel(self, session, cancel):
Expand Down
Loading

0 comments on commit 8515d8e

Please sign in to comment.