Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce the number of "untyped defs" #12716

Merged
merged 14 commits into from
May 12, 2022
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ disallow_untyped_defs = True
[mypy-synapse.storage.databases.main.user_erasure_store]
disallow_untyped_defs = True

[mypy-synapse.storage.persist_events]
disallow_untyped_defs = True

[mypy-synapse.storage.types]
disallow_untyped_defs = True

Expand Down
21 changes: 13 additions & 8 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Collection,
Deque,
Dict,
Generator,
Generic,
Iterable,
List,
Expand Down Expand Up @@ -207,7 +208,7 @@ async def add_to_queue(

return res

def _handle_queue(self, room_id):
def _handle_queue(self, room_id: str) -> None:
"""Attempts to handle the queue for a room if not already being handled.

The queue's callback will be invoked with for each item in the queue,
Expand All @@ -227,7 +228,7 @@ def _handle_queue(self, room_id):

self._currently_persisting_rooms.add(room_id)

async def handle_queue_loop():
async def handle_queue_loop() -> None:
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
Expand All @@ -250,15 +251,17 @@ async def handle_queue_loop():
with PreserveLoggingContext():
item.deferred.callback(ret)
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
remaining_queue = self._event_persist_queues.pop(room_id, None)
Copy link
Contributor Author

@DMRobertson DMRobertson May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure of the best name here. Mypy doesn't like it because it has a different type to the queue on line 233

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine, other options: to_persist_queue?

if remaining_queue:
self._event_persist_queues[room_id] = remaining_queue
self._currently_persisting_rooms.discard(room_id)

# set handle_queue_loop off in the background
run_as_background_process("persist_events", handle_queue_loop)

def _get_drainining_queue(self, room_id):
def _get_drainining_queue(
self, room_id: str
) -> Generator[_EventPersistQueueItem, None, None]:
queue = self._event_persist_queues.setdefault(room_id, deque())

try:
Expand Down Expand Up @@ -317,7 +320,9 @@ async def persist_events(
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))

async def enqueue(item):
async def enqueue(
item: Tuple[str, List[Tuple[EventBase, EventContext]]]
) -> Dict[str, str]:
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
Expand Down Expand Up @@ -1102,7 +1107,7 @@ async def _is_server_still_joined(

return False

async def _handle_potentially_left_users(self, user_ids: Set[str]):
async def _handle_potentially_left_users(self, user_ids: Set[str]) -> None:
"""Given a set of remote users check if the server still shares a room with
them. If not then mark those users' device cache as stale.
"""
Expand Down