Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix linearizer cancellation on twisted < 18.7
Browse files Browse the repository at this point in the history
Turns out that cancellation of inlineDeferreds didn't really work properly
until Twisted 18.7. This commit refactors Linearizer.queue to avoid
inlineCallbacks.
  • Loading branch information
richvdh committed Aug 10, 2018
1 parent 67dbe4c commit 638d35e
Showing 1 changed file with 68 additions and 43 deletions.
111 changes: 68 additions & 43 deletions synapse/util/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,62 +188,30 @@ def __init__(self, name=None, max_count=1, clock=None):
# things blocked from executing.
self.key_to_defer = {}

@defer.inlineCallbacks
def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
# propagated inside inlineCallbacks until Twisted 18.7)
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])

# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1][new_defer] = 1

logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
yield make_deferred_yieldable(new_defer)
except Exception as e:
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)

# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
raise

logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)

res = self._await_lock(key)
else:
logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1
res = defer.succeed(None)

# once we successfully get the lock, we need to return a context manager which
# will release the lock.

@contextmanager
def _ctx_manager():
def _ctx_manager(_):
try:
yield
finally:
Expand All @@ -264,7 +232,64 @@ def _ctx_manager():
# map.
del self.key_to_defer[key]

defer.returnValue(_ctx_manager())
res.addCallback(_ctx_manager)
return res

def _await_lock(self, key):
"""Helper for queue: adds a deferred to the queue
Assumes that we've already checked that we've reached the limit of the number
of lock-holders we allow. Creates a new deferred which is added to the list, and
adds some management around cancellations.
Returns the deferred, which will callback once we have secured the lock.
"""
entry = self.key_to_defer[key]

logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)

new_defer = make_deferred_yieldable(defer.Deferred())
entry[1][new_defer] = 1

def cb(_r):
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
return self._clock.sleep(0)

def eb(e):
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)

else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)

# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
return e

new_defer.addCallbacks(cb, eb)
return new_defer


class ReadWriteLock(object):
Expand Down

0 comments on commit 638d35e

Please sign in to comment.