Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add cancellation support to ReadWriteLock (#12120)
Browse files Browse the repository at this point in the history
Also convert `ReadWriteLock` to use async context managers.

Signed-off-by: Sean Quah <seanq@element.io>
  • Loading branch information
squahtx authored Mar 14, 2022
1 parent 8e5706d commit 605d161
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 93 deletions.
1 change: 1 addition & 0 deletions changelog.d/12120.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for cancellation to `ReadWriteLock`.
8 changes: 4 additions & 4 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async def _purge_history(
"""
self._purges_in_progress_by_room.add(room_id)
try:
with await self.pagination_lock.write(room_id):
async with self.pagination_lock.write(room_id):
await self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
Expand Down Expand Up @@ -406,7 +406,7 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
with await self.pagination_lock.write(room_id):
async with self.pagination_lock.write(room_id):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
Expand Down Expand Up @@ -448,7 +448,7 @@ async def get_messages(

room_token = from_token.room_key

with await self.pagination_lock.read(room_id):
async with self.pagination_lock.read(room_id):
(
membership,
member_event_id,
Expand Down Expand Up @@ -615,7 +615,7 @@ async def _shutdown_and_purge_room(

self._purges_in_progress_by_room.add(room_id)
try:
with await self.pagination_lock.write(room_id):
async with self.pagination_lock.write(room_id):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
Expand Down
71 changes: 40 additions & 31 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import inspect
import itertools
import logging
from contextlib import contextmanager
from contextlib import asynccontextmanager, contextmanager
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Collection,
Expand All @@ -40,7 +41,7 @@
)

import attr
from typing_extensions import ContextManager, Literal
from typing_extensions import AsyncContextManager, Literal

from twisted.internet import defer
from twisted.internet.defer import CancelledError
Expand Down Expand Up @@ -491,7 +492,7 @@ class ReadWriteLock:
Example:
with await read_write_lock.read("test_key"):
async with read_write_lock.read("test_key"):
# do some work
"""

Expand All @@ -514,22 +515,24 @@ def __init__(self) -> None:
# Latest writer queued
self.key_to_current_writer: Dict[str, defer.Deferred] = {}

async def read(self, key: str) -> ContextManager:
new_defer: "defer.Deferred[None]" = defer.Deferred()
def read(self, key: str) -> AsyncContextManager:
@asynccontextmanager
async def _ctx_manager() -> AsyncIterator[None]:
new_defer: "defer.Deferred[None]" = defer.Deferred()

curr_readers = self.key_to_current_readers.setdefault(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
curr_readers = self.key_to_current_readers.setdefault(key, set())
curr_writer = self.key_to_current_writer.get(key, None)

curr_readers.add(new_defer)
curr_readers.add(new_defer)

# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
if curr_writer:
await make_deferred_yieldable(curr_writer)

@contextmanager
def _ctx_manager() -> Iterator[None]:
try:
# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
# May raise a `CancelledError` if the `Deferred` wrapping us is
# cancelled. The `Deferred` we are waiting on must not be cancelled,
# since we do not own it.
if curr_writer:
await make_deferred_yieldable(stop_cancellation(curr_writer))
yield
finally:
with PreserveLoggingContext():
Expand All @@ -538,29 +541,35 @@ def _ctx_manager() -> Iterator[None]:

return _ctx_manager()

async def write(self, key: str) -> ContextManager:
new_defer: "defer.Deferred[None]" = defer.Deferred()
def write(self, key: str) -> AsyncContextManager:
@asynccontextmanager
async def _ctx_manager() -> AsyncIterator[None]:
new_defer: "defer.Deferred[None]" = defer.Deferred()

curr_readers = self.key_to_current_readers.get(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
curr_readers = self.key_to_current_readers.get(key, set())
curr_writer = self.key_to_current_writer.get(key, None)

# We wait on all latest readers and writer.
to_wait_on = list(curr_readers)
if curr_writer:
to_wait_on.append(curr_writer)
# We wait on all latest readers and writer.
to_wait_on = list(curr_readers)
if curr_writer:
to_wait_on.append(curr_writer)

# We can clear the list of current readers since the new writer waits
# for them to finish.
curr_readers.clear()
self.key_to_current_writer[key] = new_defer
# We can clear the list of current readers since `new_defer` waits
# for them to finish.
curr_readers.clear()
self.key_to_current_writer[key] = new_defer

await make_deferred_yieldable(defer.gatherResults(to_wait_on))

@contextmanager
def _ctx_manager() -> Iterator[None]:
to_wait_on_defer = defer.gatherResults(to_wait_on)
try:
# Wait for all current readers and the latest writer to finish.
# May raise a `CancelledError` immediately after the wait if the
# `Deferred` wrapping us is cancelled. We must only release the lock
# once we have acquired it, hence the use of `delay_cancellation`
# rather than `stop_cancellation`.
await make_deferred_yieldable(delay_cancellation(to_wait_on_defer))
yield
finally:
# Release the lock.
with PreserveLoggingContext():
new_defer.callback(None)
# `self.key_to_current_writer[key]` may be missing if there was another
Expand Down
Loading

0 comments on commit 605d161

Please sign in to comment.