Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert receipts and events databases to async/await. (#8076)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Aug 14, 2020
1 parent dc22090 commit e886195
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 82 deletions.
1 change: 1 addition & 0 deletions changelog.d/8076.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
33 changes: 14 additions & 19 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import itertools
import logging
from collections import OrderedDict, namedtuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple

import attr
from prometheus_client import Counter

from twisted.internet import defer

import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import RoomVersions
Expand Down Expand Up @@ -113,15 +111,14 @@ def __init__(
hs.config.worker.writers.events == hs.get_instance_name()
), "Can only instantiate EventsStore on master"

@defer.inlineCallbacks
def _persist_events_and_state_updates(
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
):
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Expand All @@ -136,7 +133,7 @@ def _persist_events_and_state_updates(
backfilled
Returns:
Deferred: resolves when the events have been persisted
Resolves when the events have been persisted
"""

# We want to calculate the stream orderings as late as possible, as
Expand Down Expand Up @@ -168,7 +165,7 @@ def _persist_events_and_state_updates(
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream

yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
Expand Down Expand Up @@ -206,16 +203,15 @@ def _persist_events_and_state_updates(
(room_id,), list(latest_event_ids)
)

@defer.inlineCallbacks
def _get_events_which_are_prevs(self, event_ids):
async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]:
"""Filter the supplied list of event_ids to get those which are prev_events of
existing (non-outlier/rejected) events.
Args:
event_ids (Iterable[str]): event ids to filter
event_ids: event ids to filter
Returns:
Deferred[List[str]]: filtered event ids
Filtered event ids
"""
results = []

Expand All @@ -240,14 +236,13 @@ def _get_events_which_are_prevs_txn(txn, batch):
results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))

for chunk in batch_iter(event_ids, 100):
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)

return results

@defer.inlineCallbacks
def _get_prevs_before_rejected(self, event_ids):
async def _get_prevs_before_rejected(self, event_ids: Iterable[str]) -> Set[str]:
"""Get soft-failed ancestors to remove from the extremities.
Given a set of events, find all those that have been soft-failed or
Expand All @@ -259,11 +254,11 @@ def _get_prevs_before_rejected(self, event_ids):
are separated by soft failed events.
Args:
event_ids (Iterable[str]): Events to find prev events for. Note
that these must have already been persisted.
event_ids: Events to find prev events for. Note that these must have
already been persisted.
Returns:
Deferred[set[str]]
The previous events.
"""

# The set of event_ids to return. This includes all soft-failed events
Expand Down Expand Up @@ -304,7 +299,7 @@ def _get_prevs_before_rejected_txn(txn, batch):
existing_prevs.add(prev_event_id)

for chunk in batch_iter(event_ids, 100):
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)

Expand Down
46 changes: 19 additions & 27 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import logging

from twisted.internet import defer

from synapse.api.constants import EventContentFields
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
Expand Down Expand Up @@ -94,8 +92,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
where_clause="NOT have_censored",
)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
Expand Down Expand Up @@ -155,19 +152,18 @@ def reindex_txn(txn):

return len(rows)

result = yield self.db_pool.runInteraction(
result = await self.db_pool.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)

if not result:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
)

return result

@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
async def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
Expand Down Expand Up @@ -234,19 +230,18 @@ def reindex_search_txn(txn):

return len(rows_to_update)

result = yield self.db_pool.runInteraction(
result = await self.db_pool.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)

if not result:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.EVENT_ORIGIN_SERVER_TS_NAME
)

return result

@defer.inlineCallbacks
def _cleanup_extremities_bg_update(self, progress, batch_size):
async def _cleanup_extremities_bg_update(self, progress, batch_size):
"""Background update to clean out extremities that should have been
deleted previously.
Expand Down Expand Up @@ -414,26 +409,25 @@ def _cleanup_extremities_bg_update_txn(txn):

return len(original_set)

num_handled = yield self.db_pool.runInteraction(
num_handled = await self.db_pool.runInteraction(
"_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
)

if not num_handled:
yield self.db_pool.updates._end_background_update(
await self.db_pool.updates._end_background_update(
self.DELETE_SOFT_FAILED_EXTREMITIES
)

def _drop_table_txn(txn):
txn.execute("DROP TABLE _extremities_to_check")

yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_cleanup_extremities_bg_update_drop_table", _drop_table_txn
)

return num_handled

@defer.inlineCallbacks
def _redactions_received_ts(self, progress, batch_size):
async def _redactions_received_ts(self, progress, batch_size):
"""Handles filling out the `received_ts` column in redactions.
"""
last_event_id = progress.get("last_event_id", "")
Expand Down Expand Up @@ -480,17 +474,16 @@ def _redactions_received_ts_txn(txn):

return len(rows)

count = yield self.db_pool.runInteraction(
count = await self.db_pool.runInteraction(
"_redactions_received_ts", _redactions_received_ts_txn
)

if not count:
yield self.db_pool.updates._end_background_update("redactions_received_ts")
await self.db_pool.updates._end_background_update("redactions_received_ts")

return count

@defer.inlineCallbacks
def _event_fix_redactions_bytes(self, progress, batch_size):
async def _event_fix_redactions_bytes(self, progress, batch_size):
"""Undoes hex encoded censored redacted event JSON.
"""

Expand All @@ -511,16 +504,15 @@ def _event_fix_redactions_bytes_txn(txn):

txn.execute("DROP INDEX redactions_censored_redacts")

yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
)

yield self.db_pool.updates._end_background_update("event_fix_redactions_bytes")
await self.db_pool.updates._end_background_update("event_fix_redactions_bytes")

return 1

@defer.inlineCallbacks
def _event_store_labels(self, progress, batch_size):
async def _event_store_labels(self, progress, batch_size):
"""Background update handler which will store labels for existing events."""
last_event_id = progress.get("last_event_id", "")

Expand Down Expand Up @@ -575,11 +567,11 @@ def _event_store_labels_txn(txn):

return nbrows

num_rows = yield self.db_pool.runInteraction(
num_rows = await self.db_pool.runInteraction(
desc="event_store_labels", func=_event_store_labels_txn
)

if not num_rows:
yield self.db_pool.updates._end_background_update("event_store_labels")
await self.db_pool.updates._end_background_update("event_store_labels")

return num_rows
Loading

0 comments on commit e886195

Please sign in to comment.