From 29d4b885edf7663be4d8e5af836d78fd67aa4beb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Oct 2020 16:04:45 +0100 Subject: [PATCH 01/19] Correctly deduplicate events with same transaction ID. Fixes #3365. --- synapse/handlers/message.py | 8 +++ synapse/handlers/room_member.py | 8 +++ synapse/storage/databases/main/events.py | 30 +++++++++ .../storage/databases/main/events_worker.py | 62 ++++++++++++++++++- .../main/schema/delta/58/19txn_id.sql | 29 +++++++++ synapse/storage/persist_events.py | 36 ++++++++++- 6 files changed, 169 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/databases/main/schema/delta/58/19txn_id.sql diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ee271e85e551..e011a667b764 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -752,6 +752,14 @@ async def create_and_send_nonmember_event( # extremities to pile up, which in turn leads to state resolution # taking longer. with (await self.limiter.queue(event_dict["room_id"])): + if txn_id and requester.access_token_id: + existing_event_id = await self.store.get_event_id_from_transaction_id( + requester.user.to_string(), requester.access_token_id, txn_id, + ) + if existing_event_id: + event = await self.store.get_event(existing_event_id) + return event, event.internal_metadata.stream_ordering + event, context = await self.create_event( requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 8feba8c90a39..630670a5c28c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -171,6 +171,14 @@ async def _local_membership_update( if requester.is_guest: content["kind"] = "guest" + if txn_id and requester.access_token_id: + existing_event_id = await self.store.get_event_id_from_transaction_id( + requester.user.to_string(), requester.access_token_id, txn_id, + ) + if existing_event_id: + event_pos = await self.store.get_position_for_event(existing_event_id) + return existing_event_id, event_pos.stream + event, context = await self.event_creation_handler.create_event( requester, { diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 18def01f5041..7fde8fc6dd13 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -367,6 +367,8 @@ def _persist_events_txn( self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._persist_transaction_ids_txn(txn, events_and_contexts) + # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) @@ -411,6 +413,34 @@ def _persist_events_txn( # room_memberships, where applicable. self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) + def _persist_transaction_ids_txn( + self, + txn: LoggingTransaction, + events_and_contexts: List[Tuple[EventBase, EventContext]], + ): + """Persist the mapping from transaction IDs to event IDs (if defined). + """ + + to_insert = [] + for event, _ in events_and_contexts: + token_id = getattr(event.internal_metadata, "token_id", None) + txn_id = getattr(event.internal_metadata, "txn_id", None) + if token_id and txn_id: + to_insert.append( + { + "event_id": event.event_id, + "user_id": event.sender, + "token_id": token_id, + "txn_id": txn_id, + "inserted_ts": self._clock.time_msec(), + } + ) + + if to_insert: + self.db_pool.simple_insert_many_txn( + txn, table="event_txn_id", values=to_insert, + ) + def _update_current_state_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f95679ebc440..6e5e0003ddd6 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging import threading @@ -130,6 +129,15 @@ def __init__(self, database: DatabasePool, db_conn, hs): db_conn, "events", "stream_ordering", step=-1 ) + if not hs.config.worker.worker_app: + # We periodically clean out old transaction ID mappings + self._clock.looping_call( + run_as_background_process, + 5 * 60 * 1000, + "_cleanup_old_transaction_ids", + self._cleanup_old_transaction_ids, + ) + self._get_event_cache = Cache( "*getEvent*", keylen=3, @@ -1287,3 +1295,55 @@ def get_next_event_to_expire_txn(txn): return await self.db_pool.runInteraction( desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) + + async def get_event_id_from_transaction_id( + self, user_id: str, token_id: str, txn_id: str + ) -> Optional[str]: + """Look up if we have already persisted an event for the transaction ID, + returning the event ID if so. + """ + return await self.db_pool.simple_select_one_onecol( + table="event_txn_id", + keyvalues={"user_id": user_id, "token_id": token_id, "txn_id": txn_id}, + retcol="event_id", + allow_none=True, + desc="get_event_id_from_transaction_id", + ) + + async def get_already_persisted_events( + self, events: Iterable[EventBase] + ) -> Dict[str, str]: + """Look up if we have already persisted an event for the transaction ID, + returning a mapping from event ID in the given list to the event ID of + an existing event. + """ + + mapping = {} + + for event in events: + token_id = getattr(event.internal_metadata, "token_id", None) + txn_id = getattr(event.internal_metadata, "txn_id", None) + if token_id and txn_id: + existing = await self.get_event_id_from_transaction_id( + event.sender, token_id, txn_id + ) + if existing: + mapping[event.event_id] = existing + + return mapping + + async def _cleanup_old_transaction_ids(self): + """Cleans out transaction id mappings older than 24hrs. + """ + + def _cleanup_old_transaction_ids_txn(txn): + sql = """ + DELETE FROM event_txn_id + WHERE inserted_ts < ? + """ + one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000 + txn.execute(sql, (one_day_ago,)) + + return await self.db_pool.runInteraction( + "_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn, + ) diff --git a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql new file mode 100644 index 000000000000..217b03355de1 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql @@ -0,0 +1,29 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- A map of recent events persisted with transaction IDs. Used to deduplicate +-- send event requests with the same transaction ID. +CREATE TABLE event_txn_id ( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + token_id BIGINT NOT NULL, + txn_id TEXT NOT NULL, + inserted_ts BIGINT NOT NULL +); + +CREATE UNIQUE INDEX event_txn_id_event_id ON event_txn_id(event_id); +CREATE UNIQUE INDEX event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id); +CREATE INDEX event_txn_id_ts ON event_txn_id(inserted_ts); diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 72939f3984bc..d693527e63d0 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -245,7 +245,10 @@ async def persist_event( self._maybe_start_persisting(event.room_id) - await make_deferred_yieldable(deferred) + replaced_events = await make_deferred_yieldable(deferred) + replaced_event = replaced_events.get(event.event_id) + if replaced_event: + event = await self.main_store.get_event(replaced_event) event_stream_id = event.internal_metadata.stream_ordering @@ -265,12 +268,37 @@ async def _persist_events( self, events_and_contexts: List[Tuple[EventBase, EventContext]], backfilled: bool = False, - ): + ) -> Dict[str, str]: """Calculates the change to current state and forward extremities, and persists the given events and with those updates. + + Returns: + A dictionary of event ID to event ID we didn't persist as we already + had another event persisted with the same TXN ID. """ + replaced_events = {} # type: Dict[str, str] if not events_and_contexts: - return + return replaced_events + + # Check if any of the events have a transaction ID that has already been + # persitsed, and if so we don't persist again. + # + # We should have checked this a long time before we get here, but its + # possible that different send event requests race in such a way that + # they both pass the earlier checks. + replaced_events = await self.main_store.get_already_persisted_events( + (event for event, _ in events_and_contexts) + ) + + if replaced_events: + events_and_contexts = [ + (e, ctx) + for e, ctx in events_and_contexts + if e.event_id not in replaced_events + ] + + if not events_and_contexts: + return replaced_events chunks = [ events_and_contexts[x : x + 100] @@ -439,6 +467,8 @@ async def _persist_events( await self._handle_potentially_left_users(potentially_left_users) + return replaced_events + async def _calculate_new_extremities( self, room_id: str, From 9b68b6341efe9d16e5e4984f61b66f9cac63a099 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Oct 2020 16:39:29 +0100 Subject: [PATCH 02/19] Filter through the potentially deduplicated events --- synapse/handlers/federation.py | 9 +++-- synapse/handlers/message.py | 51 ++++++++++++++++++-------- synapse/handlers/room_member.py | 6 +-- synapse/replication/http/send_event.py | 7 +++- synapse/storage/persist_events.py | 47 +++++++++++++++++++----- 5 files changed, 87 insertions(+), 33 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1a8144405ac0..0c23394c44f4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2966,17 +2966,20 @@ async def persist_events_and_notify( return result["max_stream_id"] else: assert self.storage.persistence - max_stream_token = await self.storage.persistence.persist_events( + + # Note that this returns the events that wer persisted, which may not be + # the same as we passed in if some were deduplicated due transaction IDs. + events, max_stream_token = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) if self._ephemeral_messages_enabled: - for (event, context) in event_and_contexts: + for event in events: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) if not backfilled: # Never notify for backfilled events - for event, _ in event_and_contexts: + for event in events: await self._notify_persisted_event(event, max_stream_token) return max_stream_token.stream diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e011a667b764..f2ff4e5a180f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -642,7 +642,7 @@ async def send_nonmember_event( context: EventContext, ratelimit: bool = True, ignore_shadow_ban: bool = False, - ) -> int: + ) -> Tuple[EventBase, int]: """ Persists and notifies local clients and federation of an event. @@ -654,8 +654,10 @@ async def send_nonmember_event( ignore_shadow_ban: True if shadow-banned users should be allowed to send this event. - Return: - The stream_id of the persisted event. + Returns: + The event and stream_id of the persisted event. The returned event + may not match the given event if it was deduplicated due to an + existing event matching the transaction ID. Raises: ShadowBanError if the requester has been shadow-banned. @@ -682,7 +684,10 @@ async def send_nonmember_event( event.event_id, prev_event.event_id, ) - return await self.store.get_stream_id_for_event(prev_event.event_id) + return ( + prev_event, + await self.store.get_stream_id_for_event(prev_event.event_id), + ) return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit @@ -738,6 +743,11 @@ async def create_and_send_nonmember_event( ignore_shadow_ban: True if shadow-banned users should be allowed to send this event. + Returns: + The event and stream_id of the persisted event. The returned event + may not match the given event if it was deduplicated due to an + existing event matching the transaction ID. + Raises: ShadowBanError if the requester has been shadow-banned. """ @@ -770,7 +780,7 @@ async def create_and_send_nonmember_event( spam_error = "Spam is not permitted here" raise SynapseError(403, spam_error, Codes.FORBIDDEN) - stream_id = await self.send_nonmember_event( + event, stream_id = await self.send_nonmember_event( requester, event, context, @@ -851,7 +861,7 @@ async def handle_new_client_event( context: EventContext, ratelimit: bool = True, extra_users: List[UserID] = [], - ) -> int: + ) -> Tuple[EventBase, int]: """Processes a new event. This includes checking auth, persisting it, notifying users, sending to remote servers, etc. @@ -865,8 +875,10 @@ async def handle_new_client_event( ratelimit extra_users: Any extra users to notify about event - Return: - The stream_id of the persisted event. + Returns: + The event and stream_id of the persisted event. The returned event + may not match the given event if it was deduplicated due to an + existing event matching the transaction ID. """ if event.is_state() and (event.type, event.state_key) == ( @@ -922,14 +934,17 @@ async def handle_new_client_event( extra_users=extra_users, ) stream_id = result["stream_id"] + event_id = result["event_id"] + if event_id != event.event_id: + event = await self.store.get_event(event_id) event.internal_metadata.stream_ordering = stream_id - return stream_id + return event, stream_id - stream_id = await self.persist_and_notify_client_event( + event, stream_id = await self.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return stream_id + return event, stream_id except Exception: # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. @@ -974,7 +989,7 @@ async def persist_and_notify_client_event( context: EventContext, ratelimit: bool = True, extra_users: List[UserID] = [], - ) -> int: + ) -> Tuple[EventBase, int]: """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. @@ -1146,9 +1161,13 @@ def is_inviter_member_event(e): if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") - event_pos, max_stream_token = await self.storage.persistence.persist_event( - event, context=context - ) + # Note that this returns the event that was persisted, which may not be + # the same as we passed in if it was deduplicated due transaction IDs. + ( + event, + event_pos, + max_stream_token, + ) = await self.storage.persistence.persist_event(event, context=context) if self._ephemeral_events_enabled: # If there's an expiry timestamp on the event, schedule its expiry. @@ -1169,7 +1188,7 @@ def _notify(): # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) - return event_pos.stream + return event, event_pos.stream async def _bump_active_time(self, user: UserID) -> None: try: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 630670a5c28c..9704c10bc9cb 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -229,7 +229,7 @@ async def _local_membership_update( retry_after_ms=int(1000 * (time_allowed - time_now_s)) ) - stream_id = await self.event_creation_handler.handle_new_client_event( + event, stream_id = await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[target], ratelimit=ratelimit, ) @@ -700,7 +700,7 @@ async def send_membership_event( if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - await self.event_creation_handler.handle_new_client_event( + event, _ = await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[target_user], ratelimit=ratelimit ) @@ -1193,7 +1193,7 @@ async def _locally_reject_invite( context = await self.state_handler.compute_event_context(event) context.app_service = requester.app_service - stream_id = await self.event_creation_handler.handle_new_client_event( + event, stream_id = await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[UserID.from_string(target_user)], ) return event.event_id, stream_id diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 9a3a694d5dfa..0eda680a32d0 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -116,11 +116,14 @@ async def _handle_request(self, request, event_id): "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - stream_id = await self.event_creation_handler.persist_and_notify_client_event( + ( + event, + stream_id, + ) = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return 200, {"stream_id": stream_id} + return 200, {"stream_id": stream_id, "event_id": event.event_id} def register_servlets(hs, http_server): diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index d693527e63d0..0636e17142cb 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -199,7 +199,7 @@ async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], backfilled: bool = False, - ) -> RoomStreamToken: + ) -> Tuple[List[EventBase], RoomStreamToken]: """ Write events to the database Args: @@ -209,7 +209,11 @@ async def persist_events( which might update the current state etc. Returns: - the stream ordering of the latest persisted event + List of events persisted, the current position room stream position. + The list of events persisted may not be the same as those passed in + if they were deduplicated due to an event already existing that + matched the transcation ID; the existing event is returned in such + a case. """ partitioned = {} # type: Dict[str, List[Tuple[EventBase, EventContext]]] for event, ctx in events_and_contexts: @@ -225,19 +229,41 @@ async def persist_events( for room_id in partitioned: self._maybe_start_persisting(room_id) - await make_deferred_yieldable( + # The deferred returns a map from event ID to existing event ID if the + # event was deduplicated. (The dict may also include other entries if + # the event was persisted in a batch with other events). + # + # Since we use `defer.gatherResults` we need to merge the returned list + # of dicts into one. + ret_vals = await make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) + replaced_events = {} + for d in ret_vals: + replaced_events.update(d) + + events = [] + for event, _ in events_and_contexts: + existing_event_id = replaced_events.get(event.event_id) + if existing_event_id: + events.append(await self.main_store.get_event(existing_event_id)) + else: + events.append(event) - return self.main_store.get_room_max_token() + return ( + events, + self.main_store.get_room_max_token(), + ) async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False - ) -> Tuple[PersistedEventPosition, RoomStreamToken]: + ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: """ Returns: - The stream ordering of `event`, and the stream ordering of the - latest persisted event + The event, stream ordering of `event`, and the stream ordering of the + latest persisted event. The returned event may not match the given + event if it was deduplicated due to an existing event matching the + transaction ID. """ deferred = self._event_persist_queue.add_to_queue( event.room_id, [(event, context)], backfilled=backfilled @@ -245,6 +271,9 @@ async def persist_event( self._maybe_start_persisting(event.room_id) + # The deferred returns a map from event ID to existing event ID if the + # event was deduplicated. (The dict may also include other entries if + # the event was persisted in a batch with other events.) replaced_events = await make_deferred_yieldable(deferred) replaced_event = replaced_events.get(event.event_id) if replaced_event: @@ -253,12 +282,12 @@ async def persist_event( event_stream_id = event.internal_metadata.stream_ordering pos = PersistedEventPosition(self._instance_name, event_stream_id) - return pos, self.main_store.get_room_max_token() + return event, pos, self.main_store.get_room_max_token() def _maybe_start_persisting(self, room_id: str): async def persisting_queue(item): with Measure(self._clock, "persist_events"): - await self._persist_events( + return await self._persist_events( item.events_and_contexts, backfilled=item.backfilled ) From 76d1d943e56415d73b7aa7d524edad1da4d3b179 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Oct 2020 16:41:39 +0100 Subject: [PATCH 03/19] Newsfile --- changelog.d/8476.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8476.bugfix diff --git a/changelog.d/8476.bugfix b/changelog.d/8476.bugfix new file mode 100644 index 000000000000..993a269979af --- /dev/null +++ b/changelog.d/8476.bugfix @@ -0,0 +1 @@ +Fix message duplication if something goes wrong after persisting the event. From 0869a4f20b810bcb96054ac27e1ca91dee9834aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Oct 2020 17:20:15 +0100 Subject: [PATCH 04/19] Add unit test --- tests/handlers/test_message.py | 117 +++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 tests/handlers/test_message.py diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py new file mode 100644 index 000000000000..f958f307961a --- /dev/null +++ b/tests/handlers/test_message.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from synapse.api.constants import EventTypes +from synapse.rest import admin +from synapse.rest.client.v1 import login, room +from synapse.types import create_requester + +from tests import unittest + +logger = logging.getLogger(__name__) + + +class EventCreationTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def test_duplicated_txn_id(self): + """Test that attempting to handle/persist an event with a transaction ID + that has already been persisted correctly returns the old event and does + *not* produce duplicate messages. + """ + + handler = self.hs.get_event_creation_handler() + persist_event_storage = self.hs.get_storage().persistence + + user_id = self.register_user("tester", "foobar") + access_token = self.login("tester", "foobar") + room_id = self.helper.create_room_as(user_id, tok=access_token) + + # We make the IDs up here, which is fine. + token_id = 4957834 + txn_id = "something_suitably_random" + + requester = create_requester(user_id, access_token_id=token_id) + + def create_duplicate_event(): + return self.get_success( + handler.create_event( + requester, + { + "type": EventTypes.Message, + "room_id": room_id, + "sender": requester.user.to_string(), + "content": {"msgtype": "m.text", "body": "Hello"}, + }, + token_id=4957834, + txn_id=txn_id, + ) + ) + + event1, context = create_duplicate_event() + + ret_event1, stream_id1 = self.get_success( + handler.handle_new_client_event(requester, event1, context) + ) + + self.assertEqual(event1.event_id, ret_event1.event_id) + + event2, context = create_duplicate_event() + + # We want to test that the deduplication at the persit event end works, + # so we want to make sure we test with different events. + self.assertNotEqual(event1.event_id, event2.event_id) + + ret_event2, stream_id2 = self.get_success( + handler.handle_new_client_event(requester, event2, context) + ) + + # Assert that the returned values match those from the initial event + # rather than the new one. + self.assertEqual(ret_event1.event_id, ret_event2.event_id) + self.assertEqual(stream_id1, stream_id2) + + # Let's test that calling `persist_event` directly also does the right + # thing. + event3, context = create_duplicate_event() + self.assertNotEqual(event1.event_id, event3.event_id) + + ret_event3, event_pos3, _ = self.get_success( + persist_event_storage.persist_event(event3, context) + ) + + # Assert that the returned values match those from the initial event + # rather than the new one. + self.assertEqual(ret_event1.event_id, ret_event3.event_id) + self.assertEqual(stream_id1, event_pos3.stream) + + # Let's test that calling `persist_events` directly also does the right + # thing. + event4, context = create_duplicate_event() + self.assertNotEqual(event1.event_id, event3.event_id) + + events, _ = self.get_success( + persist_event_storage.persist_events([(event3, context)]) + ) + ret_event4 = events[0] + + # Assert that the returned values match those from the initial event + # rather than the new one. + self.assertEqual(ret_event1.event_id, ret_event4.event_id) From 3613071e440deb74946114dd519e2121087913e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Oct 2020 17:21:27 +0100 Subject: [PATCH 05/19] Fix typo Co-authored-by: Patrick Cloke --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0c23394c44f4..3e4458a0ad5a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2967,7 +2967,7 @@ async def persist_events_and_notify( else: assert self.storage.persistence - # Note that this returns the events that wer persisted, which may not be + # Note that this returns the events that were persisted, which may not be # the same as we passed in if some were deduplicated due transaction IDs. events, max_stream_token = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled From 1a49e23555caaaad8b545a19e910c87bb16dc20d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 09:31:06 +0100 Subject: [PATCH 06/19] Apply suggestions from code review Co-authored-by: Patrick Cloke --- synapse/handlers/federation.py | 2 +- synapse/storage/persist_events.py | 4 ++-- tests/handlers/test_message.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3e4458a0ad5a..93211b2bae3a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2968,7 +2968,7 @@ async def persist_events_and_notify( assert self.storage.persistence # Note that this returns the events that were persisted, which may not be - # the same as we passed in if some were deduplicated due transaction IDs. + # the same as were passed in if some were deduplicated due to transaction IDs. events, max_stream_token = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 0636e17142cb..97e63a1dfc43 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -310,9 +310,9 @@ async def _persist_events( return replaced_events # Check if any of the events have a transaction ID that has already been - # persitsed, and if so we don't persist again. + # persisted, and if so we don't persist it again. # - # We should have checked this a long time before we get here, but its + # We should have checked this a long time before we get here, but it's # possible that different send event requests race in such a way that # they both pass the earlier checks. replaced_events = await self.main_store.get_already_persisted_events( diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index f958f307961a..232c915e10f2 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -60,7 +60,7 @@ def create_duplicate_event(): "sender": requester.user.to_string(), "content": {"msgtype": "m.text", "body": "Hello"}, }, - token_id=4957834, + token_id=token_id, txn_id=txn_id, ) ) From d22241eced127b1cd0c21a8c8a935c746d8891e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 09:40:02 +0100 Subject: [PATCH 07/19] Code review --- synapse/handlers/message.py | 2 +- synapse/storage/persist_events.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f2ff4e5a180f..9da47a0b0e57 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -686,7 +686,7 @@ async def send_nonmember_event( ) return ( prev_event, - await self.store.get_stream_id_for_event(prev_event.event_id), + prev_event.internal_metadata.stream_ordering, ) return await self.handle_new_client_event( diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 97e63a1dfc43..d656957942a1 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -285,6 +285,14 @@ async def persist_event( return event, pos, self.main_store.get_room_max_token() def _maybe_start_persisting(self, room_id: str): + """Pokes the `_event_persist_queue` to start handling new items in the + queue, if not already in progress. + + Causes the deferreds returned by `add_to_queue` to resolve with: a + dictionary of event ID to event ID we didn't persist as we already had + another event persisted with the same TXN ID. + """ + async def persisting_queue(item): with Measure(self._clock, "persist_events"): return await self._persist_events( From 136b97e64c4030ce2ff91709e935cc2edbb83605 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 14:59:07 +0100 Subject: [PATCH 08/19] Fix tests.rest.client.test_third_party_rules --- tests/rest/client/test_third_party_rules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rest/client/test_third_party_rules.py b/tests/rest/client/test_third_party_rules.py index c12518c93105..94cd5ba8dfa1 100644 --- a/tests/rest/client/test_third_party_rules.py +++ b/tests/rest/client/test_third_party_rules.py @@ -104,7 +104,7 @@ async def check(ev, state): request, channel = self.make_request( "PUT", - "/_matrix/client/r0/rooms/%s/send/foo.bar.forbidden/1" % self.room_id, + "/_matrix/client/r0/rooms/%s/send/foo.bar.forbidden/2" % self.room_id, {}, access_token=self.tok, ) From 33909dd85bafa3f3d03406d1b1be0b14d0b74679 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 15:00:52 +0100 Subject: [PATCH 09/19] Fix typo Co-authored-by: Patrick Cloke --- tests/handlers/test_message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index 560f85a1bbf3..dbad7dc158e6 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From cd89f44d612e6012a4d6448fa4812fe391a4a5fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 15:01:34 +0100 Subject: [PATCH 10/19] moar words --- synapse/storage/databases/main/schema/delta/58/19txn_id.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql index 217b03355de1..31e81314b435 100644 --- a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql +++ b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql @@ -16,6 +16,9 @@ -- A map of recent events persisted with transaction IDs. Used to deduplicate -- send event requests with the same transaction ID. +-- +-- Note, transaction IDs are scoped to the user ID/access token that was used to +-- make the request. CREATE TABLE event_txn_id ( event_id TEXT NOT NULL, user_id TEXT NOT NULL, From 7a1c4177668339333ccdc7c897156bd1c1350ae2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 14:22:37 +0100 Subject: [PATCH 11/19] Add comment as to why we fetch event again --- synapse/handlers/message.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 718623fe0b36..0a13673b06f7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -922,8 +922,14 @@ async def handle_new_client_event( stream_id = result["stream_id"] event_id = result["event_id"] if event_id != event.event_id: + # If we get a different event back then it means that its + # been de-duplicated, so we replace the given event with the + # one already persisted. event = await self.store.get_event(event_id) else: + # If we newly persisted the event then we need to update its + # stream_ordering entry manually (as it was persisted on + # another worker). event.internal_metadata.stream_ordering = stream_id return event From 8f5931d504c629546211e26ce59789e127b271f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 14:22:55 +0100 Subject: [PATCH 12/19] Make 'persist_and_notify_client_event' only return EventBase and document --- synapse/handlers/message.py | 11 ++++++++--- synapse/replication/http/send_event.py | 13 ++++++++----- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0a13673b06f7..2ca65fc8d731 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -933,7 +933,7 @@ async def handle_new_client_event( event.internal_metadata.stream_ordering = stream_id return event - event, stream_id = await self.persist_and_notify_client_event( + event = await self.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) @@ -982,11 +982,16 @@ async def persist_and_notify_client_event( context: EventContext, ratelimit: bool = True, extra_users: List[UserID] = [], - ) -> Tuple[EventBase, int]: + ) -> EventBase: """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. This should only be run on the instance in charge of persisting events. + + Returns: + The persisted event. This may be different than the given event if + it was de-duplicated (e.g. because we had already persisted an + event with the same transaction ID.) """ assert self.storage.persistence is not None assert self._events_shard_config.should_handle( @@ -1181,7 +1186,7 @@ def _notify(): # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) - return event, event_pos.stream + return event async def _bump_active_time(self, user: UserID) -> None: try: diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 0eda680a32d0..36ba6b1c9297 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -116,14 +116,17 @@ async def _handle_request(self, request, event_id): "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - ( - event, - stream_id, - ) = await self.event_creation_handler.persist_and_notify_client_event( + event = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return 200, {"stream_id": stream_id, "event_id": event.event_id} + return ( + 200, + { + "stream_id": event.internal_metadata.stream_ordering, + "event_id": event.event_id, + }, + ) def register_servlets(hs, http_server): From 22eb20647e9b2ba0aafe3f5c3d921e6628eab03b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 14:34:39 +0100 Subject: [PATCH 13/19] Comments --- synapse/handlers/room_member.py | 3 +++ synapse/storage/persist_events.py | 9 ++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index dd05114908ad..e196e95923e0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -171,6 +171,9 @@ async def _local_membership_update( if requester.is_guest: content["kind"] = "guest" + # Check if we already have an event with a matching transaction ID. (We + # do this check just before we persist an event as well, but may as well + # do it up front for efficiency.) if txn_id and requester.access_token_id: existing_event_id = await self.store.get_event_id_from_transaction_id( requester.user.to_string(), requester.access_token_id, txn_id, diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index b0afa7132db1..70e636b0bac0 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -96,7 +96,9 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled): Returns: defer.Deferred: a deferred which will resolve once the events are - persisted. Runs its callbacks *without* a logcontext. + persisted. Runs its callbacks *without* a logcontext. The result + is the same as that returned by the callback passed to + `handle_queue`. """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: @@ -229,7 +231,7 @@ async def persist_events( for room_id in partitioned: self._maybe_start_persisting(room_id) - # The deferred returns a map from event ID to existing event ID if the + # Each deferred returns a map from event ID to existing event ID if the # event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events). # @@ -324,7 +326,8 @@ async def _persist_events( # # We should have checked this a long time before we get here, but it's # possible that different send event requests race in such a way that - # they both pass the earlier checks. + # they both pass the earlier checks. Checking here isn't racey as we can + # have only one `_persist_events` per room being called at a time. replaced_events = await self.main_store.get_already_persisted_events( (event for event, _ in events_and_contexts) ) From 3daf42174203c2eda8945ed23802a86ee7ba861b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 14:38:21 +0100 Subject: [PATCH 14/19] Fix types --- synapse/storage/databases/main/events_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 7f8ce1725fa1..4e418c59fe2e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1314,7 +1314,7 @@ def get_next_event_to_expire_txn(txn): ) async def get_event_id_from_transaction_id( - self, user_id: str, token_id: str, txn_id: str + self, user_id: str, token_id: int, txn_id: str ) -> Optional[str]: """Look up if we have already persisted an event for the transaction ID, returning the event ID if so. From 9503e172f3dfa0bc44350a777b0b4810cd86480a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 15:02:23 +0100 Subject: [PATCH 15/19] Add FK restraints and IF NOT EXISTS --- .../storage/databases/main/registration.py | 6 +++++- .../main/schema/delta/58/19txn_id.sql | 21 ++++++++++++------- tests/handlers/test_message.py | 9 +++++--- tests/unittest.py | 11 ++++++++-- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 16ba5457403a..c9ce6f05a335 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -939,7 +939,7 @@ async def add_access_token_to_user( token: str, device_id: Optional[str], valid_until_ms: Optional[int], - ) -> None: + ) -> int: """Adds an access token for the given user. Args: @@ -949,6 +949,8 @@ async def add_access_token_to_user( valid_until_ms: when the token is valid until. None for no expiry. Raises: StoreError if there was a problem adding this. + Returns: + The token ID """ next_id = self._access_tokens_id_gen.get_next() @@ -964,6 +966,8 @@ async def add_access_token_to_user( desc="add_access_token_to_user", ) + return next_id + def _set_device_for_access_token_txn(self, txn, token: str, device_id: str) -> str: old_device_id = self.db_pool.simple_select_one_onecol_txn( txn, "access_tokens", {"token": token}, "device_id" diff --git a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql index 31e81314b435..82c00dd90833 100644 --- a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql +++ b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql @@ -17,16 +17,23 @@ -- A map of recent events persisted with transaction IDs. Used to deduplicate -- send event requests with the same transaction ID. -- --- Note, transaction IDs are scoped to the user ID/access token that was used to --- make the request. -CREATE TABLE event_txn_id ( +-- Note: transaction IDs are scoped to the room ID/user ID/access token that was +-- used to make the request. +-- +-- Note: The foreign key constraints are ON DELETE CASCADE, as if we delete the +-- events or access token we don't want to try and de-duplicate the event. +CREATE TABLE IF NOT EXISTS event_txn_id ( event_id TEXT NOT NULL, user_id TEXT NOT NULL, token_id BIGINT NOT NULL, txn_id TEXT NOT NULL, - inserted_ts BIGINT NOT NULL + inserted_ts BIGINT NOT NULL, + FOREIGN KEY (event_id) + REFERENCES events (event_id) ON DELETE CASCADE, + FOREIGN KEY (token_id) + REFERENCES access_tokens (id) ON DELETE CASCADE ); -CREATE UNIQUE INDEX event_txn_id_event_id ON event_txn_id(event_id); -CREATE UNIQUE INDEX event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id); -CREATE INDEX event_txn_id_ts ON event_txn_id(inserted_ts); +CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id); +CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts); diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index dbad7dc158e6..d32de57515d4 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -44,12 +44,15 @@ def test_duplicated_txn_id(self): access_token = self.login("tester", "foobar") room_id = self.helper.create_room_as(user_id, tok=access_token) - # We make the IDs up here, which is fine. - token_id = 4957834 - txn_id = "something_suitably_random" + info = self.get_success( + self.hs.get_datastore().get_user_by_access_token(access_token,) + ) + token_id = info["token_id"] requester = create_requester(user_id, access_token_id=token_id) + txn_id = "something_suitably_random" + def create_duplicate_event(): return self.get_success( handler.create_event( diff --git a/tests/unittest.py b/tests/unittest.py index 5c87f6097ec8..6c1661c92c14 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -254,17 +254,24 @@ def setUp(self): if hasattr(self, "user_id"): if self.hijack_auth: + # We need a valid token ID to satisfy foreign key constraints. + token_id = self.get_success( + self.hs.get_datastore().add_access_token_to_user( + self.helper.auth_user_id, "some_fake_token", None, None, + ) + ) + async def get_user_by_access_token(token=None, allow_guest=False): return { "user": UserID.from_string(self.helper.auth_user_id), - "token_id": 1, + "token_id": token_id, "is_guest": False, } async def get_user_by_req(request, allow_guest=False, rights="access"): return create_requester( UserID.from_string(self.helper.auth_user_id), - 1, + token_id, False, False, None, From 46d49198b2c3f4acc5b915544491505fe3f13c8e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 15:23:22 +0100 Subject: [PATCH 16/19] Scope transaction IDs to rooms --- synapse/handlers/message.py | 5 ++++- synapse/handlers/room_member.py | 2 +- synapse/storage/databases/main/events.py | 1 + synapse/storage/databases/main/events_worker.py | 11 ++++++++--- .../databases/main/schema/delta/58/19txn_id.sql | 3 ++- 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2ca65fc8d731..fc6a9c1ddfee 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -711,7 +711,10 @@ async def create_and_send_nonmember_event( with (await self.limiter.queue(event_dict["room_id"])): if txn_id and requester.access_token_id: existing_event_id = await self.store.get_event_id_from_transaction_id( - requester.user.to_string(), requester.access_token_id, txn_id, + event_dict["room_id"], + requester.user.to_string(), + requester.access_token_id, + txn_id, ) if existing_event_id: event = await self.store.get_event(existing_event_id) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e196e95923e0..dfb86ba2b0ca 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -176,7 +176,7 @@ async def _local_membership_update( # do it up front for efficiency.) if txn_id and requester.access_token_id: existing_event_id = await self.store.get_event_id_from_transaction_id( - requester.user.to_string(), requester.access_token_id, txn_id, + room_id, requester.user.to_string(), requester.access_token_id, txn_id, ) if existing_event_id: event_pos = await self.store.get_position_for_event(existing_event_id) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index ac565afbd9d2..1a9baa15f70e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -423,6 +423,7 @@ def _persist_transaction_ids_txn( to_insert.append( { "event_id": event.event_id, + "room_id": event.room_id, "user_id": event.sender, "token_id": token_id, "txn_id": txn_id, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4e418c59fe2e..1e5d63c884ac 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1314,14 +1314,19 @@ def get_next_event_to_expire_txn(txn): ) async def get_event_id_from_transaction_id( - self, user_id: str, token_id: int, txn_id: str + self, room_id: str, user_id: str, token_id: int, txn_id: str ) -> Optional[str]: """Look up if we have already persisted an event for the transaction ID, returning the event ID if so. """ return await self.db_pool.simple_select_one_onecol( table="event_txn_id", - keyvalues={"user_id": user_id, "token_id": token_id, "txn_id": txn_id}, + keyvalues={ + "room_id": room_id, + "user_id": user_id, + "token_id": token_id, + "txn_id": txn_id, + }, retcol="event_id", allow_none=True, desc="get_event_id_from_transaction_id", @@ -1342,7 +1347,7 @@ async def get_already_persisted_events( txn_id = getattr(event.internal_metadata, "txn_id", None) if token_id and txn_id: existing = await self.get_event_id_from_transaction_id( - event.sender, token_id, txn_id + event.room_id, event.sender, token_id, txn_id ) if existing: mapping[event.event_id] = existing diff --git a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql index 82c00dd90833..b2454121a825 100644 --- a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql +++ b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql @@ -24,6 +24,7 @@ -- events or access token we don't want to try and de-duplicate the event. CREATE TABLE IF NOT EXISTS event_txn_id ( event_id TEXT NOT NULL, + room_id TEXT NOT NULL, user_id TEXT NOT NULL, token_id BIGINT NOT NULL, txn_id TEXT NOT NULL, @@ -35,5 +36,5 @@ CREATE TABLE IF NOT EXISTS event_txn_id ( ); CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id); -CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id); +CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id); CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts); From 807d44c410ffe8bcc9bed87a0347504e97acee84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 15:27:43 +0100 Subject: [PATCH 17/19] Add comment to repl /send_event API --- synapse/replication/http/send_event.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 36ba6b1c9297..4876b78fec8a 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -46,6 +46,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "ratelimit": true, "extra_users": [], } + + 200 OK + + { "stream_id": ..., "event_id": ... } + + The returned event ID may not match the sent event if it was deduplicated. """ NAME = "send_event" From 58a70d26b00de4429b8bbf668d44c276fad17a32 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Oct 2020 18:19:21 +0100 Subject: [PATCH 18/19] Update synapse/replication/http/send_event.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/replication/http/send_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 4876b78fec8a..fc129dbaa7b7 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -49,7 +49,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): 200 OK - { "stream_id": ..., "event_id": ... } + { "stream_id": 12345, "event_id": "$abcdef..." } The returned event ID may not match the sent event if it was deduplicated. """ From 6419d09216f6230507e1d4214ba2bfad636eb9ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Oct 2020 10:44:57 +0100 Subject: [PATCH 19/19] Handle persisting duplicate events at the same time --- .../storage/databases/main/events_worker.py | 16 +++ tests/handlers/test_message.py | 109 ++++++++++++------ 2 files changed, 88 insertions(+), 37 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 1e5d63c884ac..01d57d4598bb 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1338,19 +1338,35 @@ async def get_already_persisted_events( """Look up if we have already persisted an event for the transaction ID, returning a mapping from event ID in the given list to the event ID of an existing event. + + Also checks if there are duplicates in the given events, if there are + will map duplicates to the *first* event. """ mapping = {} + txn_id_to_event = {} # type: Dict[Tuple[str, int, str], str] for event in events: token_id = getattr(event.internal_metadata, "token_id", None) txn_id = getattr(event.internal_metadata, "txn_id", None) + if token_id and txn_id: + # Check if this is a duplicate of an event in the given events. + existing = txn_id_to_event.get((event.room_id, token_id, txn_id)) + if existing: + mapping[event.event_id] = existing + continue + + # Check if this is a duplicate of an event we've already + # persisted. existing = await self.get_event_id_from_transaction_id( event.room_id, event.sender, token_id, txn_id ) if existing: mapping[event.event_id] = existing + txn_id_to_event[(event.room_id, token_id, txn_id)] = existing + else: + txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id return mapping diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index d32de57515d4..64e28bc639ac 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -13,11 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import Tuple from synapse.api.constants import EventTypes +from synapse.events import EventBase +from synapse.events.snapshot import EventContext from synapse.rest import admin from synapse.rest.client.v1 import login, room from synapse.types import create_requester +from synapse.util.stringutils import random_string from tests import unittest @@ -31,60 +35,67 @@ class EventCreationTestCase(unittest.HomeserverTestCase): room.register_servlets, ] - def test_duplicated_txn_id(self): - """Test that attempting to handle/persist an event with a transaction ID - that has already been persisted correctly returns the old event and does - *not* produce duplicate messages. - """ - - handler = self.hs.get_event_creation_handler() - persist_event_storage = self.hs.get_storage().persistence + def prepare(self, reactor, clock, hs): + self.handler = self.hs.get_event_creation_handler() + self.persist_event_storage = self.hs.get_storage().persistence - user_id = self.register_user("tester", "foobar") - access_token = self.login("tester", "foobar") - room_id = self.helper.create_room_as(user_id, tok=access_token) + self.user_id = self.register_user("tester", "foobar") + self.access_token = self.login("tester", "foobar") + self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token) - info = self.get_success( - self.hs.get_datastore().get_user_by_access_token(access_token,) + self.info = self.get_success( + self.hs.get_datastore().get_user_by_access_token(self.access_token,) ) - token_id = info["token_id"] + self.token_id = self.info["token_id"] - requester = create_requester(user_id, access_token_id=token_id) + self.requester = create_requester(self.user_id, access_token_id=self.token_id) - txn_id = "something_suitably_random" + def _create_duplicate_event(self, txn_id: str) -> Tuple[EventBase, EventContext]: + """Create a new event with the given transaction ID. All events produced + by this method will be considered duplicates. + """ - def create_duplicate_event(): - return self.get_success( - handler.create_event( - requester, - { - "type": EventTypes.Message, - "room_id": room_id, - "sender": requester.user.to_string(), - "content": {"msgtype": "m.text", "body": "Hello"}, - }, - token_id=token_id, - txn_id=txn_id, - ) + # We create a new event with a random body, as otherwise we'll produce + # *exactly* the same event with the same hash, and so same event ID. + return self.get_success( + self.handler.create_event( + self.requester, + { + "type": EventTypes.Message, + "room_id": self.room_id, + "sender": self.requester.user.to_string(), + "content": {"msgtype": "m.text", "body": random_string(5)}, + }, + token_id=self.token_id, + txn_id=txn_id, ) + ) + + def test_duplicated_txn_id(self): + """Test that attempting to handle/persist an event with a transaction ID + that has already been persisted correctly returns the old event and does + *not* produce duplicate messages. + """ + + txn_id = "something_suitably_random" - event1, context = create_duplicate_event() + event1, context = self._create_duplicate_event(txn_id) ret_event1 = self.get_success( - handler.handle_new_client_event(requester, event1, context) + self.handler.handle_new_client_event(self.requester, event1, context) ) stream_id1 = ret_event1.internal_metadata.stream_ordering self.assertEqual(event1.event_id, ret_event1.event_id) - event2, context = create_duplicate_event() + event2, context = self._create_duplicate_event(txn_id) # We want to test that the deduplication at the persit event end works, # so we want to make sure we test with different events. self.assertNotEqual(event1.event_id, event2.event_id) ret_event2 = self.get_success( - handler.handle_new_client_event(requester, event2, context) + self.handler.handle_new_client_event(self.requester, event2, context) ) stream_id2 = ret_event2.internal_metadata.stream_ordering @@ -95,11 +106,11 @@ def create_duplicate_event(): # Let's test that calling `persist_event` directly also does the right # thing. - event3, context = create_duplicate_event() + event3, context = self._create_duplicate_event(txn_id) self.assertNotEqual(event1.event_id, event3.event_id) ret_event3, event_pos3, _ = self.get_success( - persist_event_storage.persist_event(event3, context) + self.persist_event_storage.persist_event(event3, context) ) # Assert that the returned values match those from the initial event @@ -109,14 +120,38 @@ def create_duplicate_event(): # Let's test that calling `persist_events` directly also does the right # thing. - event4, context = create_duplicate_event() + event4, context = self._create_duplicate_event(txn_id) self.assertNotEqual(event1.event_id, event3.event_id) events, _ = self.get_success( - persist_event_storage.persist_events([(event3, context)]) + self.persist_event_storage.persist_events([(event3, context)]) ) ret_event4 = events[0] # Assert that the returned values match those from the initial event # rather than the new one. self.assertEqual(ret_event1.event_id, ret_event4.event_id) + + def test_duplicated_txn_id_one_call(self): + """Test that we correctly handle duplicates that we try and persist at + the same time. + """ + + txn_id = "something_else_suitably_random" + + # Create two duplicate events to persist at the same time + event1, context1 = self._create_duplicate_event(txn_id) + event2, context2 = self._create_duplicate_event(txn_id) + + # Ensure their event IDs are different to start with + self.assertNotEqual(event1.event_id, event2.event_id) + + events, _ = self.get_success( + self.persist_event_storage.persist_events( + [(event1, context1), (event2, context2)] + ) + ) + + # Check that we've deduplicated the events. + self.assertEqual(len(events), 2) + self.assertEqual(events[0].event_id, events[1].event_id)