Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add a DeltaState to track changes to be made to current state (#6716)
Browse files Browse the repository at this point in the history
* commit '0e6876007':
  Add a DeltaState to track changes to be made to current state (#6716)
  • Loading branch information
anoadragon453 committed Mar 23, 2020
2 parents fd3b31d + 0e68760 commit a09011a
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 99 deletions.
1 change: 1 addition & 0 deletions changelog.d/6716.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a `DeltaState` to track changes to be made to current state during event persistence.
87 changes: 43 additions & 44 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
from collections import Counter as c_counter, OrderedDict, namedtuple
from functools import wraps
from typing import Dict, List, Tuple

from six import iteritems, text_type
from six.moves import range
Expand All @@ -41,8 +42,9 @@
from synapse.storage.data_stores.main.event_federation import EventFederationStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
from synapse.storage.database import Database
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.storage.database import Database, LoggingTransaction
from synapse.storage.persist_events import DeltaState
from synapse.types import RoomStreamToken, StateMap, get_domain_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.iterutils import batch_iter
Expand Down Expand Up @@ -148,30 +150,26 @@ def fetch(txn):
@defer.inlineCallbacks
def _persist_events_and_state_updates(
self,
events_and_contexts,
current_state_for_room,
state_delta_for_room,
new_forward_extremeties,
backfilled=False,
delete_existing=False,
events_and_contexts: List[Tuple[EventBase, EventContext]],
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
delete_existing: bool = False,
):
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Args:
events_and_contexts (list[(EventBase, EventContext)]):
current_state_for_room (dict[str, dict]): Map from room_id to the
current state of the room based on forward extremities
state_delta_for_room (dict[str, tuple]): Map from room_id to tuple
of `(to_delete, to_insert)` where to_delete is a list
of type/state keys to remove from current state, and to_insert
is a map (type,key)->event_id giving the state delta in each
room.
new_forward_extremities (dict[str, list[str]]): Map from room_id
to list of event IDs that are the new forward extremities of
the room.
backfilled (bool)
delete_existing (bool):
events_and_contexts:
current_state_for_room: Map from room_id to the current state of
the room based on forward extremities
state_delta_for_room: Map from room_id to the delta to apply to
room state
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
delete_existing
Returns:
Deferred: resolves when the events have been persisted
Expand Down Expand Up @@ -352,12 +350,12 @@ def _get_prevs_before_rejected_txn(txn, batch):
@log_function
def _persist_events_txn(
self,
txn,
events_and_contexts,
backfilled,
delete_existing=False,
state_delta_for_room={},
new_forward_extremeties={},
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
delete_existing: bool = False,
state_delta_for_room: Dict[str, DeltaState] = {},
new_forward_extremeties: Dict[str, List[str]] = {},
):
"""Insert some number of room events into the necessary database tables.
Expand All @@ -366,21 +364,16 @@ def _persist_events_txn(
whether the event was rejected.
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]):
events to persist
backfilled (bool): True if the events were backfilled
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
txn
events_and_contexts: events to persist
backfilled: True if the events were backfilled
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
(to_delete, to_insert), being a list of type/state keys to be
removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
list of the event ids which are the forward extremities.
state_delta_for_room: The current-state delta for each room.
new_forward_extremetie: The new forward extremities for each room.
For each room, a list of the event ids which are the forward
extremities.
"""
all_events_and_contexts = events_and_contexts
Expand Down Expand Up @@ -465,9 +458,15 @@ def _persist_events_txn(
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)

def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
def _update_current_state_txn(
self,
txn: LoggingTransaction,
state_delta_by_room: Dict[str, DeltaState],
stream_id: int,
):
for room_id, delta_state in iteritems(state_delta_by_room):
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert

# First we add entries to the current_state_delta_stream. We
# do this before updating the current_state_events table so
Expand Down
Loading

0 comments on commit a09011a

Please sign in to comment.