From 73904ab59f31aa8007a05eb7fe216f5cfc5b5b4c Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 11:39:42 +0100 Subject: [PATCH 01/13] Add StreamKeyType class --- synapse/types.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/synapse/types.py b/synapse/types.py index 9ac688b23b28..77f420234f71 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -625,6 +625,22 @@ async def to_string(self, store: "DataStore") -> str: return "s%d" % (self.stream,) +class StreamKeyType: + """Known stream types. + + A stream is a list of entities ordered by an incrementing "stream token". + """ + + ROOM = "room_key" + PRESENCE = "presence_key" + TYPING = "typing_key" + RECEIPT = "receipt_key" + ACCOUNT_DATA = "account_data_key" + PUSH_RULES = "push_rules_key" + TO_DEVICE = "to_device_key" + DEVICE_LIST = "device_list_key" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class StreamToken: """A collection of keys joined together by underscores in the following From 9904f423c3d4fce53584cf58814cd625a324205d Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 11:59:34 +0100 Subject: [PATCH 02/13] Replace instances of "room_key" --- synapse/handlers/initial_sync.py | 13 ++++++++----- synapse/handlers/pagination.py | 6 +++--- synapse/handlers/room.py | 9 +++++---- synapse/handlers/search.py | 10 +++++----- synapse/handlers/sync.py | 15 ++++++++++----- synapse/notifier.py | 3 ++- synapse/storage/databases/main/e2e_room_keys.py | 4 ++-- synapse/storage/databases/main/relations.py | 6 ++++-- synapse/types.py | 4 ++-- 9 files changed, 41 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index a7db8feb57eb..e163589c43bc 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -30,6 +30,7 @@ Requester, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, ) @@ -220,8 +221,10 @@ async def handle_room(event: RoomsForUser) -> None: self.storage, user_id, messages ) - start_token = now_token.copy_and_replace("room_key", token) - end_token = now_token.copy_and_replace("room_key", room_end_token) + start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token) + end_token = now_token.copy_and_replace( + StreamKeyType.ROOM, room_end_token + ) time_now = self.clock.time_msec() d["messages"] = { @@ -369,8 +372,8 @@ async def _room_initial_sync_parted( self.storage, user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken.START.copy_and_replace("room_key", token) - end_token = StreamToken.START.copy_and_replace("room_key", stream_token) + start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token) + end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token) time_now = self.clock.time_msec() @@ -472,7 +475,7 @@ async def get_receipts() -> List[JsonDict]: self.storage, user_id, messages, is_peeking=is_peeking ) - start_token = now_token.copy_and_replace("room_key", token) + start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token) end_token = now_token time_now = self.clock.time_msec() diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 7ee334037376..fb7e8b4ee681 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -27,7 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester +from synapse.types import JsonDict, Requester, StreamKeyType from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -491,7 +491,7 @@ async def get_messages( if leave_token.topological < curr_topo: from_token = from_token.copy_and_replace( - "room_key", leave_token + StreamKeyType.ROOM, leave_token ) await self.hs.get_federation_handler().maybe_backfill( @@ -513,7 +513,7 @@ async def get_messages( event_filter=event_filter, ) - next_token = from_token.copy_and_replace("room_key", next_key) + next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) if events: if event_filter: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b31f00b517a9..8bfa0cbfc8cb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -72,6 +72,7 @@ RoomID, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, create_requester, @@ -1237,10 +1238,10 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]: events_after=events_after, state=await filter_evts(state_events), aggregations=aggregations, - start=await token.copy_and_replace("room_key", results.start).to_string( - self.store - ), - end=await token.copy_and_replace("room_key", results.end).to_string( + start=await token.copy_and_replace( + StreamKeyType.ROOM, results.start + ).to_string(self.store), + end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string( self.store ), ) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 5619f8f50e03..cd1c47dae8b1 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -24,7 +24,7 @@ from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.storage.state import StateFilter -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -655,11 +655,11 @@ async def _calculate_event_contexts( "events_before": events_before, "events_after": events_after, "start": await now_token.copy_and_replace( - "room_key", res.start + StreamKeyType.ROOM, res.start + ).to_string(self.store), + "end": await now_token.copy_and_replace( + StreamKeyType.ROOM, res.end ).to_string(self.store), - "end": await now_token.copy_and_replace("room_key", res.end).to_string( - self.store - ), } if include_profile: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5125126a807c..b3a7fa4caa7e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -37,6 +37,7 @@ Requester, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, ) @@ -537,7 +538,9 @@ async def _load_filtered_recents( prev_batch_token = now_token if recents: room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace("room_key", room_key) + prev_batch_token = now_token.copy_and_replace( + StreamKeyType.ROOM, room_key + ) return TimelineBatch( events=recents, prev_batch=prev_batch_token, limited=False @@ -611,7 +614,7 @@ async def _load_filtered_recents( recents = recents[-timeline_limit:] room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace("room_key", room_key) + prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key) # Don't bother to bundle aggregations if the timeline is unlimited, # as clients will have all the necessary information. @@ -1826,7 +1829,7 @@ async def _get_rooms_changed( # stream token as it'll only be used in the context of this # room. (c.f. the docstring of `to_room_stream_token`). leave_token = since_token.copy_and_replace( - "room_key", leave_position.to_room_stream_token() + StreamKeyType.ROOM, leave_position.to_room_stream_token() ) # If this is an out of band message, like a remote invite @@ -1875,7 +1878,9 @@ async def _get_rooms_changed( if room_entry: events, start_key = room_entry - prev_batch_token = now_token.copy_and_replace("room_key", start_key) + prev_batch_token = now_token.copy_and_replace( + StreamKeyType.ROOM, start_key + ) entry = RoomSyncResultBuilder( room_id=room_id, @@ -1972,7 +1977,7 @@ async def _get_all_rooms( continue leave_token = now_token.copy_and_replace( - "room_key", RoomStreamToken(None, event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/notifier.py b/synapse/notifier.py index 16d15a1f3328..44a882b1e778 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -46,6 +46,7 @@ JsonDict, PersistedEventPosition, RoomStreamToken, + StreamKeyType, StreamToken, UserID, ) @@ -372,7 +373,7 @@ def _notify_pending_new_room_events( if users or rooms: self.on_new_event( - "room_key", + StreamKeyType.ROOM, max_room_stream_token, users=users, rooms=rooms, diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index b789a588a54b..af59be6b4854 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -21,7 +21,7 @@ from synapse.logging.opentracing import log_kv, trace from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import LoggingTransaction -from synapse.types import JsonDict, JsonSerializable +from synapse.types import JsonDict, JsonSerializable, StreamKeyType from synapse.util import json_encoder @@ -126,7 +126,7 @@ async def add_e2e_room_keys( "message": "Set room key", "room_id": room_id, "session_id": session_id, - "room_key": room_key, + StreamKeyType.ROOM: room_key, } ) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index a5c31f6787d9..dc93c5f42c58 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -34,7 +34,7 @@ from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.engines import PostgresEngine -from synapse.types import JsonDict, RoomStreamToken, StreamToken +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken from synapse.util.caches.descriptors import cached, cachedList logger = logging.getLogger(__name__) @@ -161,7 +161,9 @@ def _get_recent_references_for_event_txn( if len(events) > limit and last_topo_id and last_stream_id: next_key = RoomStreamToken(last_topo_id, last_stream_id) if from_token: - next_token = from_token.copy_and_replace("room_key", next_key) + next_token = from_token.copy_and_replace( + StreamKeyType.ROOM, next_key + ) else: next_token = StreamToken( room_key=next_key, diff --git a/synapse/types.py b/synapse/types.py index 77f420234f71..43a542ea6936 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -752,9 +752,9 @@ def copy_and_advance(self, key, new_value) -> "StreamToken": """Advance the given key in the token to a new value if and only if the new value is after the old value. """ - if key == "room_key": + if key == StreamKeyType.ROOM: new_token = self.copy_and_replace( - "room_key", self.room_key.copy_and_advance(new_value) + StreamKeyType.ROOM, self.room_key.copy_and_advance(new_value) ) return new_token From 80cf36972ca3594663dbe3ffe83f91a662788c44 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 12:01:41 +0100 Subject: [PATCH 03/13] Replace instances of "presence_key" --- synapse/handlers/appservice.py | 9 +++++---- synapse/handlers/presence.py | 6 +++--- synapse/handlers/sync.py | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index b3894666ccf5..679140ebc549 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -38,6 +38,7 @@ JsonDict, RoomAlias, RoomStreamToken, + StreamKeyType, UserID, ) from synapse.util.async_helpers import Linearizer @@ -213,7 +214,7 @@ def notify_interested_services_ephemeral( Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key", "presence_key", + `stream_key` can be "typing_key", "receipt_key", StreamKeyType.PRESENCE, "to_device_key" or "device_list_key". Any other value for `stream_key` will cause this function to return early. @@ -237,7 +238,7 @@ def notify_interested_services_ephemeral( if stream_key not in ( "typing_key", "receipt_key", - "presence_key", + StreamKeyType.PRESENCE, "to_device_key", "device_list_key", ): @@ -285,7 +286,7 @@ def notify_interested_services_ephemeral( in ( "typing_key", "receipt_key", - "presence_key", + StreamKeyType.PRESENCE, "to_device_key", ) and service.supports_ephemeral @@ -342,7 +343,7 @@ async def _notify_interested_services_ephemeral( service, "read_receipt", new_token ) - elif stream_key == "presence_key": + elif stream_key == StreamKeyType.PRESENCE: events = await self._handle_presence(service, users, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d078162c2938..31df9a09afe1 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -66,7 +66,7 @@ from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.storage.databases.main import DataStore from synapse.streams import EventSource -from synapse.types import JsonDict, UserID, get_domain_from_id +from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure @@ -522,7 +522,7 @@ async def notify_from_replication( room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", + StreamKeyType.PRESENCE, stream_id, rooms=room_ids_to_states.keys(), users=users_to_states.keys(), @@ -1137,7 +1137,7 @@ async def _persist_and_notify(self, states: List[UserPresenceState]) -> None: room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", + StreamKeyType.PRESENCE, stream_id, rooms=room_ids_to_states.keys(), users=[UserID.from_string(u) for u in users_to_states], diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b3a7fa4caa7e..e54357bfd5d8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1506,7 +1506,7 @@ async def _generate_sync_entry_for_presence( ) assert presence_key sync_result_builder.now_token = now_token.copy_and_replace( - "presence_key", presence_key + StreamKeyType.PRESENCE, presence_key ) extra_users_ids = set(newly_joined_or_invited_users) From 9e9caaa9f481cf30c476547f3f5b5ce13cc36658 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 12:02:31 +0100 Subject: [PATCH 04/13] Replace instances of "typing_key" --- synapse/handlers/appservice.py | 8 ++++---- synapse/handlers/sync.py | 2 +- synapse/handlers/typing.py | 4 ++-- synapse/replication/tcp/client.py | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 679140ebc549..2f56241811c8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -214,7 +214,7 @@ def notify_interested_services_ephemeral( Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key", StreamKeyType.PRESENCE, + `stream_key` can be StreamKeyType.TYPING, "receipt_key", StreamKeyType.PRESENCE, "to_device_key" or "device_list_key". Any other value for `stream_key` will cause this function to return early. @@ -236,7 +236,7 @@ def notify_interested_services_ephemeral( # Only the following streams are currently supported. # FIXME: We should use constants for these values. if stream_key not in ( - "typing_key", + StreamKeyType.TYPING, "receipt_key", StreamKeyType.PRESENCE, "to_device_key", @@ -284,7 +284,7 @@ def notify_interested_services_ephemeral( if ( stream_key in ( - "typing_key", + StreamKeyType.TYPING, "receipt_key", StreamKeyType.PRESENCE, "to_device_key", @@ -318,7 +318,7 @@ async def _notify_interested_services_ephemeral( logger.debug("Checking interested services for %s", stream_key) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - if stream_key == "typing_key": + if stream_key == StreamKeyType.TYPING: # Note that we don't persist the token (via set_appservice_stream_type_pos) # for typing_key due to performance reasons and due to their highly # ephemeral nature. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e54357bfd5d8..6dba6de2fd26 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -450,7 +450,7 @@ async def ephemeral_by_room( room_ids=room_ids, is_guest=sync_config.is_guest, ) - now_token = now_token.copy_and_replace("typing_key", typing_key) + now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key) ephemeral_by_room: JsonDict = {} diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 6854428b7ca5..bb00750bfd47 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -25,7 +25,7 @@ ) from synapse.replication.tcp.streams import TypingStream from synapse.streams import EventSource -from synapse.types import JsonDict, Requester, UserID, get_domain_from_id +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -382,7 +382,7 @@ def _push_update_local(self, member: RoomMember, typing: bool) -> None: ) self.notifier.on_new_event( - "typing_key", self._latest_room_serial, rooms=[member.room_id] + StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id] ) async def get_all_typing_updates( diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 122892c7bca2..e06c7f62e040 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -43,7 +43,7 @@ EventsStreamEventRow, EventsStreamRow, ) -from synapse.types import PersistedEventPosition, ReadReceipt, UserID +from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -153,7 +153,7 @@ async def on_rdata( if stream_name == TypingStream.NAME: self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows] ) elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( From 22b56c6c4077e1e2341f3ebbec6ea09011381fe5 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 12:09:18 +0100 Subject: [PATCH 05/13] Replace instances of "receipt_key" --- synapse/handlers/appservice.py | 8 ++++---- synapse/handlers/receipts.py | 12 ++++++++++-- synapse/handlers/sync.py | 2 +- synapse/replication/tcp/client.py | 2 +- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2f56241811c8..2f8711d4a2be 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -214,7 +214,7 @@ def notify_interested_services_ephemeral( Args: stream_key: The stream the event came from. - `stream_key` can be StreamKeyType.TYPING, "receipt_key", StreamKeyType.PRESENCE, + `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, "to_device_key" or "device_list_key". Any other value for `stream_key` will cause this function to return early. @@ -237,7 +237,7 @@ def notify_interested_services_ephemeral( # FIXME: We should use constants for these values. if stream_key not in ( StreamKeyType.TYPING, - "receipt_key", + StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, "to_device_key", "device_list_key", @@ -285,7 +285,7 @@ def notify_interested_services_ephemeral( stream_key in ( StreamKeyType.TYPING, - "receipt_key", + StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, "to_device_key", ) @@ -334,7 +334,7 @@ async def _notify_interested_services_ephemeral( async with self._ephemeral_events_linearizer.queue( (service.id, stream_key) ): - if stream_key == "receipt_key": + if stream_key == StreamKeyType.RECEIPT: events = await self._handle_receipts(service, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index cfe860decc95..a80867bcc209 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -17,7 +17,13 @@ from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes from synapse.appservice import ApplicationService from synapse.streams import EventSource -from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id +from synapse.types import ( + JsonDict, + ReadReceipt, + StreamKeyType, + UserID, + get_domain_from_id, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -129,7 +135,9 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: affected_room_ids = list({r.room_id for r in receipts}) - self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids) + self.notifier.on_new_event( + StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids + ) # Note that the min here shouldn't be relied upon to be accurate. await self.hs.get_pusherpool().on_new_receipts( min_batch_id, max_batch_id, affected_room_ids diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6dba6de2fd26..06ebf0fd0945 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -472,7 +472,7 @@ async def ephemeral_by_room( room_ids=room_ids, is_guest=sync_config.is_guest, ) - now_token = now_token.copy_and_replace("receipt_key", receipt_key) + now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key) for event in receipts: room_id = event["room_id"] diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e06c7f62e040..e3a6cdaa4581 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -165,7 +165,7 @@ async def on_rdata( ) elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows] + StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows] ) await self._pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} From 3ccb48c1139b4e968d7717846572a79d39fed973 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 12:10:54 +0100 Subject: [PATCH 06/13] Replace instances of "account_data_key" --- synapse/handlers/account_data.py | 10 +++++----- synapse/replication/tcp/client.py | 2 +- .../server_notices/resource_limits_server_notices.py | 5 ++++- synapse/server_notices/server_notices_manager.py | 4 ++-- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 4af9fbc5d10a..0478448b47ea 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -23,7 +23,7 @@ ReplicationUserAccountDataRestServlet, ) from synapse.streams import EventSource -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -105,7 +105,7 @@ async def add_account_data_to_room( ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) await self._notify_modules(user_id, room_id, account_data_type, content) @@ -141,7 +141,7 @@ async def add_account_data_for_user( ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) await self._notify_modules(user_id, None, account_data_type, content) @@ -176,7 +176,7 @@ async def add_tag_to_room( ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) return max_stream_id else: @@ -201,7 +201,7 @@ async def remove_tag_from_room(self, user_id: str, room_id: str, tag: str) -> in ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) return max_stream_id else: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e3a6cdaa4581..9586779640b9 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -161,7 +161,7 @@ async def on_rdata( ) elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows] + StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows] ) elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py index 015dd08f05e4..ddb7e0f3e0b0 100644 --- a/synapse/server_notices/resource_limits_server_notices.py +++ b/synapse/server_notices/resource_limits_server_notices.py @@ -22,6 +22,7 @@ ) from synapse.api.errors import AuthError, ResourceLimitError, SynapseError from synapse.server_notices.server_notices_manager import SERVER_NOTICE_ROOM_TAG +from synapse.types import StreamKeyType if TYPE_CHECKING: from synapse.server import HomeServer @@ -179,7 +180,9 @@ async def _check_and_set_tags(self, user_id: str, room_id: str) -> None: max_id = await self._account_data_handler.add_tag_to_room( user_id, room_id, SERVER_NOTICE_ROOM_TAG, {} ) - self._notifier.on_new_event("account_data_key", max_id, users=[user_id]) + self._notifier.on_new_event( + StreamKeyType.ACCOUNT_DATA, max_id, users=[user_id] + ) async def _is_room_currently_blocked(self, room_id: str) -> Tuple[bool, List[str]]: """ diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py index 48eae5fa062a..e607b5ec2ae0 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py @@ -16,7 +16,7 @@ from synapse.api.constants import EventTypes, Membership, RoomCreationPreset from synapse.events import EventBase -from synapse.types import Requester, UserID, create_requester +from synapse.types import Requester, StreamKeyType, UserID, create_requester from synapse.util.caches.descriptors import cached if TYPE_CHECKING: @@ -169,7 +169,7 @@ async def get_or_create_notice_room_for_user(self, user_id: str) -> str: max_id = await self._account_data_handler.add_tag_to_room( user_id, room_id, SERVER_NOTICE_ROOM_TAG, {} ) - self._notifier.on_new_event("account_data_key", max_id, users=[user_id]) + self._notifier.on_new_event(StreamKeyType.ACCOUNT_DATA, max_id, users=[user_id]) logger.info("Created server notices room %s for %s", room_id, user_id) return room_id From 163cf586d765f73c3ef516a8ea63db3793cca68d Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 12:11:42 +0100 Subject: [PATCH 07/13] Replace instances of "push_rules_key" --- synapse/replication/tcp/client.py | 2 +- synapse/rest/client/push_rule.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 9586779640b9..9be2c254a769 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -157,7 +157,7 @@ async def on_rdata( ) elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows] + StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows] ) elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( diff --git a/synapse/rest/client/push_rule.py b/synapse/rest/client/push_rule.py index b98640b14ac5..1c6edcce9881 100644 --- a/synapse/rest/client/push_rule.py +++ b/synapse/rest/client/push_rule.py @@ -32,7 +32,7 @@ from synapse.push.rulekinds import PRIORITY_CLASS_MAP from synapse.rest.client._base import client_patterns from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException -from synapse.types import JsonDict +from synapse.types import JsonDict, StreamKeyType if TYPE_CHECKING: from synapse.server import HomeServer From 193ab408e7b7d11edf98b8eaabb8fa72867b1c29 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 12:12:31 +0100 Subject: [PATCH 08/13] Replace instances of "to_device_key" --- synapse/handlers/appservice.py | 10 +++++----- synapse/handlers/devicemessage.py | 6 +++--- synapse/handlers/sync.py | 2 +- synapse/notifier.py | 2 +- synapse/replication/tcp/client.py | 4 +++- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2f8711d4a2be..d8b020263473 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -215,7 +215,7 @@ def notify_interested_services_ephemeral( stream_key: The stream the event came from. `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, - "to_device_key" or "device_list_key". Any other value for `stream_key` + StreamKeyType.TO_DEVICE or "device_list_key". Any other value for `stream_key` will cause this function to return early. Ephemeral events will only be pushed to appservices that have opted into @@ -239,7 +239,7 @@ def notify_interested_services_ephemeral( StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, - "to_device_key", + StreamKeyType.TO_DEVICE, "device_list_key", ): return @@ -259,7 +259,7 @@ def notify_interested_services_ephemeral( # Ignore to-device messages if the feature flag is not enabled if ( - stream_key == "to_device_key" + stream_key == StreamKeyType.TO_DEVICE and not self._msc2409_to_device_messages_enabled ): return @@ -287,7 +287,7 @@ def notify_interested_services_ephemeral( StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, - "to_device_key", + StreamKeyType.TO_DEVICE, ) and service.supports_ephemeral ) @@ -352,7 +352,7 @@ async def _notify_interested_services_ephemeral( service, "presence", new_token ) - elif stream_key == "to_device_key": + elif stream_key == StreamKeyType.TO_DEVICE: # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. to_device_messages = await self._get_to_device_messages( diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 4cb725d027c7..53668cce3bb4 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -26,7 +26,7 @@ set_tag, ) from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet -from synapse.types import JsonDict, Requester, UserID, get_domain_from_id +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string @@ -151,7 +151,7 @@ async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: # Notify listeners that there are new to-device messages to process, # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", last_stream_id, users=local_messages.keys() + StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys() ) async def _check_for_unknown_devices( @@ -285,7 +285,7 @@ async def send_device_message( # Notify listeners that there are new to-device messages to process, # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", last_stream_id, users=local_messages.keys() + StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys() ) if self.federation_sender: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06ebf0fd0945..b74395840c5f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1401,7 +1401,7 @@ async def _generate_sync_entry_for_to_device( now_token.to_device_key, ) sync_result_builder.now_token = now_token.copy_and_replace( - "to_device_key", stream_id + StreamKeyType.TO_DEVICE, stream_id ) sync_result_builder.to_device = messages else: diff --git a/synapse/notifier.py b/synapse/notifier.py index 44a882b1e778..c44d77ea6340 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -443,7 +443,7 @@ def on_new_event( for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) - if stream_key == "to_device_key": + if stream_key == StreamKeyType.TO_DEVICE: issue9533_logger.debug( "to-device messages stream id %s, awaking streams for %s", new_token, diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 9be2c254a769..4af752b252e6 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -173,7 +173,9 @@ async def on_rdata( elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: - self.notifier.on_new_event("to_device_key", token, users=entities) + self.notifier.on_new_event( + StreamKeyType.TO_DEVICE, token, users=entities + ) elif stream_name == DeviceListsStream.NAME: all_room_ids: Set[str] = set() for row in rows: From 6fbacde149cf8b987e9c0acd4863806dcd169cea Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 15:01:35 +0100 Subject: [PATCH 09/13] Replace instances of "device_list_key" --- synapse/handlers/appservice.py | 10 +++++----- synapse/handlers/device.py | 7 +++++-- synapse/replication/tcp/client.py | 4 +++- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index d8b020263473..578c5bccc173 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -215,7 +215,7 @@ def notify_interested_services_ephemeral( stream_key: The stream the event came from. `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, - StreamKeyType.TO_DEVICE or "device_list_key". Any other value for `stream_key` + StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key` will cause this function to return early. Ephemeral events will only be pushed to appservices that have opted into @@ -240,7 +240,7 @@ def notify_interested_services_ephemeral( StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, StreamKeyType.TO_DEVICE, - "device_list_key", + StreamKeyType.DEVICE_LIST, ): return @@ -266,7 +266,7 @@ def notify_interested_services_ephemeral( # Ignore device lists if the feature flag is not enabled if ( - stream_key == "device_list_key" + stream_key == StreamKeyType.DEVICE_LIST and not self._msc3202_transaction_extensions_enabled ): return @@ -292,7 +292,7 @@ def notify_interested_services_ephemeral( and service.supports_ephemeral ) or ( - stream_key == "device_list_key" + stream_key == StreamKeyType.DEVICE_LIST and service.msc3202_transaction_extensions ) ] @@ -367,7 +367,7 @@ async def _notify_interested_services_ephemeral( service, "to_device", new_token ) - elif stream_key == "device_list_key": + elif stream_key == StreamKeyType.DEVICE_LIST: device_list_summary = await self._get_device_list_summary( service, new_token ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a91b1ee4d5f4..1d6d1f8a9248 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -43,6 +43,7 @@ ) from synapse.types import ( JsonDict, + StreamKeyType, StreamToken, UserID, get_domain_from_id, @@ -502,7 +503,7 @@ async def notify_device_update( # specify the user ID too since the user should always get their own device list # updates, even if they aren't in any rooms. self.notifier.on_new_event( - "device_list_key", position, users={user_id}, rooms=room_ids + StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids ) # We may need to do some processing asynchronously for local user IDs. @@ -523,7 +524,9 @@ async def notify_user_signature_update( from_user_id, user_ids ) - self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, position, users=[from_user_id] + ) async def user_left_room(self, user: UserID, room_id: str) -> None: user_id = user.to_string() diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 4af752b252e6..0a7d58b16240 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -182,7 +182,9 @@ async def on_rdata( if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids + ) elif stream_name == GroupServerStream.NAME: self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] From c14d8726761ad74f99409a7ce488efee6c357c56 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 27 Apr 2022 15:22:36 +0100 Subject: [PATCH 10/13] Changelog --- changelog.d/12567.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12567.misc diff --git a/changelog.d/12567.misc b/changelog.d/12567.misc new file mode 100644 index 000000000000..35f08569bada --- /dev/null +++ b/changelog.d/12567.misc @@ -0,0 +1 @@ +Replace string literal instances of stream key types with typed constants. \ No newline at end of file From eee450c49604fa57d5b7b1fda5126e6a3d825238 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 28 Apr 2022 14:13:11 +0100 Subject: [PATCH 11/13] lint --- synapse/rest/client/push_rule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/push_rule.py b/synapse/rest/client/push_rule.py index 1c6edcce9881..b98640b14ac5 100644 --- a/synapse/rest/client/push_rule.py +++ b/synapse/rest/client/push_rule.py @@ -32,7 +32,7 @@ from synapse.push.rulekinds import PRIORITY_CLASS_MAP from synapse.rest.client._base import client_patterns from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException -from synapse.types import JsonDict, StreamKeyType +from synapse.types import JsonDict if TYPE_CHECKING: from synapse.server import HomeServer From caa54d3fd75a233cdef157b4296648c1ad4306c6 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 28 Apr 2022 14:51:09 +0100 Subject: [PATCH 12/13] Mark all StreamKeyType's as Final --- synapse/types.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/synapse/types.py b/synapse/types.py index 43a542ea6936..7117e912c3b3 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -20,6 +20,7 @@ Any, ClassVar, Dict, + Final, List, Mapping, Match, @@ -631,14 +632,14 @@ class StreamKeyType: A stream is a list of entities ordered by an incrementing "stream token". """ - ROOM = "room_key" - PRESENCE = "presence_key" - TYPING = "typing_key" - RECEIPT = "receipt_key" - ACCOUNT_DATA = "account_data_key" - PUSH_RULES = "push_rules_key" - TO_DEVICE = "to_device_key" - DEVICE_LIST = "device_list_key" + ROOM: Final = "room_key" + PRESENCE: Final = "presence_key" + TYPING: Final = "typing_key" + RECEIPT: Final = "receipt_key" + ACCOUNT_DATA: Final = "account_data_key" + PUSH_RULES: Final = "push_rules_key" + TO_DEVICE: Final = "to_device_key" + DEVICE_LIST: Final = "device_list_key" @attr.s(slots=True, frozen=True, auto_attribs=True) From 263bac1f65a7624bb3c3820838b8b7ca0777ca83 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 28 Apr 2022 16:11:53 +0100 Subject: [PATCH 13/13] Fix Final import --- synapse/types.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/types.py b/synapse/types.py index 7117e912c3b3..75d126feaa8f 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -20,7 +20,6 @@ Any, ClassVar, Dict, - Final, List, Mapping, Match, @@ -36,7 +35,7 @@ import attr from frozendict import frozendict from signedjson.key import decode_verify_key_bytes -from typing_extensions import TypedDict +from typing_extensions import Final, TypedDict from unpaddedbase64 import decode_base64 from zope.interface import Interface