Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

remove retry_on_integrity_error wrapper for persist_events #7848

Merged
merged 2 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7848.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove redundant `retry_on_integrity_error` wrapper for event persistence code.
clokep marked this conversation as resolved.
Show resolved Hide resolved
66 changes: 0 additions & 66 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,6 @@ def encode_json(json_object):
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))


def _retry_on_integrity_error(func):
"""Wraps a database function so that it gets retried on IntegrityError,
with `delete_existing=True` passed in.

Args:
func: function that returns a Deferred and accepts a `delete_existing` arg
"""

@wraps(func)
@defer.inlineCallbacks
def f(self, *args, **kwargs):
try:
res = yield func(self, *args, delete_existing=False, **kwargs)
except self.database_engine.module.IntegrityError:
logger.exception("IntegrityError, retrying.")
res = yield func(self, *args, delete_existing=True, **kwargs)
return res

return f


@attr.s(slots=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.
Expand Down Expand Up @@ -134,7 +113,6 @@ def __init__(self, hs: "HomeServer", db: Database, main_data_store: "DataStore")
hs.config.worker.writers.events == hs.get_instance_name()
), "Can only instantiate EventsStore on master"

@_retry_on_integrity_error
@defer.inlineCallbacks
def _persist_events_and_state_updates(
self,
Expand All @@ -143,7 +121,6 @@ def _persist_events_and_state_updates(
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
delete_existing: bool = False,
):
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Expand All @@ -157,7 +134,6 @@ def _persist_events_and_state_updates(
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
delete_existing

Returns:
Deferred: resolves when the events have been persisted
Expand Down Expand Up @@ -197,7 +173,6 @@ def _persist_events_and_state_updates(
self._persist_events_txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
delete_existing=delete_existing,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
Expand Down Expand Up @@ -341,7 +316,6 @@ def _persist_events_txn(
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
delete_existing: bool = False,
state_delta_for_room: Dict[str, DeltaState] = {},
new_forward_extremeties: Dict[str, List[str]] = {},
):
Expand Down Expand Up @@ -393,13 +367,6 @@ def _persist_events_txn(
# From this point onwards the events are only events that we haven't
# seen before.

if delete_existing:
# For paranoia reasons, we go and delete all the existing entries
# for these events so we can reinsert them.
# This gets around any problems with some tables already having
# entries.
self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts)

self._store_event_txn(txn, events_and_contexts=events_and_contexts)

# Insert into event_to_state_groups.
Expand Down Expand Up @@ -797,39 +764,6 @@ def _update_outliers_txn(self, txn, events_and_contexts):

return [ec for ec in events_and_contexts if ec[0] not in to_remove]

@classmethod
def _delete_existing_rows_txn(cls, txn, events_and_contexts):
if not events_and_contexts:
# nothing to do here
return

logger.info("Deleting existing")

for table in (
"events",
"event_auth",
"event_json",
"event_edges",
"event_forward_extremities",
"event_reference_hashes",
"event_search",
"event_to_state_groups",
"state_events",
"rejections",
"redactions",
"room_memberships",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts],
)

for table in ("event_push_actions",):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.room_id, ev.event_id) for ev, _ in events_and_contexts],
)

def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables

Expand Down