Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add StreamKeyType class and replace string literals with constants #12567

Merged
merged 14 commits into from
May 16, 2022
Merged
1 change: 1 addition & 0 deletions changelog.d/12567.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replace string literal instances of stream key types with typed constants.
10 changes: 5 additions & 5 deletions synapse/handlers/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
ReplicationUserAccountDataRestServlet,
)
from synapse.streams import EventSource
from synapse.types import JsonDict, UserID
from synapse.types import JsonDict, StreamKeyType, UserID

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -105,7 +105,7 @@ async def add_account_data_to_room(
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)

await self._notify_modules(user_id, room_id, account_data_type, content)
Expand Down Expand Up @@ -141,7 +141,7 @@ async def add_account_data_for_user(
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)

await self._notify_modules(user_id, None, account_data_type, content)
Expand Down Expand Up @@ -176,7 +176,7 @@ async def add_tag_to_room(
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
return max_stream_id
else:
Expand All @@ -201,7 +201,7 @@ async def remove_tag_from_room(self, user_id: str, room_id: str, tag: str) -> in
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
return max_stream_id
else:
Expand Down
39 changes: 20 additions & 19 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
JsonDict,
RoomAlias,
RoomStreamToken,
StreamKeyType,
UserID,
)
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -213,8 +214,8 @@ def notify_interested_services_ephemeral(
Args:
stream_key: The stream the event came from.

`stream_key` can be "typing_key", "receipt_key", "presence_key",
"to_device_key" or "device_list_key". Any other value for `stream_key`
`stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE,
StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key`
will cause this function to return early.

Ephemeral events will only be pushed to appservices that have opted into
Expand All @@ -235,11 +236,11 @@ def notify_interested_services_ephemeral(
# Only the following streams are currently supported.
# FIXME: We should use constants for these values.
if stream_key not in (
"typing_key",
"receipt_key",
"presence_key",
"to_device_key",
"device_list_key",
StreamKeyType.TYPING,
StreamKeyType.RECEIPT,
StreamKeyType.PRESENCE,
StreamKeyType.TO_DEVICE,
StreamKeyType.DEVICE_LIST,
):
return

Expand All @@ -258,14 +259,14 @@ def notify_interested_services_ephemeral(

# Ignore to-device messages if the feature flag is not enabled
if (
stream_key == "to_device_key"
stream_key == StreamKeyType.TO_DEVICE
and not self._msc2409_to_device_messages_enabled
):
return

# Ignore device lists if the feature flag is not enabled
if (
stream_key == "device_list_key"
stream_key == StreamKeyType.DEVICE_LIST
and not self._msc3202_transaction_extensions_enabled
):
return
Expand All @@ -283,15 +284,15 @@ def notify_interested_services_ephemeral(
if (
stream_key
in (
"typing_key",
"receipt_key",
"presence_key",
"to_device_key",
StreamKeyType.TYPING,
StreamKeyType.RECEIPT,
StreamKeyType.PRESENCE,
StreamKeyType.TO_DEVICE,
)
and service.supports_ephemeral
)
or (
stream_key == "device_list_key"
stream_key == StreamKeyType.DEVICE_LIST
and service.msc3202_transaction_extensions
)
]
Expand All @@ -317,7 +318,7 @@ async def _notify_interested_services_ephemeral(
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
Expand All @@ -333,7 +334,7 @@ async def _notify_interested_services_ephemeral(
async with self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
):
if stream_key == "receipt_key":
if stream_key == StreamKeyType.RECEIPT:
events = await self._handle_receipts(service, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)

Expand All @@ -342,7 +343,7 @@ async def _notify_interested_services_ephemeral(
service, "read_receipt", new_token
)

elif stream_key == "presence_key":
elif stream_key == StreamKeyType.PRESENCE:
events = await self._handle_presence(service, users, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)

Expand All @@ -351,7 +352,7 @@ async def _notify_interested_services_ephemeral(
service, "presence", new_token
)

elif stream_key == "to_device_key":
elif stream_key == StreamKeyType.TO_DEVICE:
# Retrieve a list of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
to_device_messages = await self._get_to_device_messages(
Expand All @@ -366,7 +367,7 @@ async def _notify_interested_services_ephemeral(
service, "to_device", new_token
)

elif stream_key == "device_list_key":
elif stream_key == StreamKeyType.DEVICE_LIST:
device_list_summary = await self._get_device_list_summary(
service, new_token
)
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from synapse.types import (
JsonDict,
StreamKeyType,
StreamToken,
UserID,
get_domain_from_id,
Expand Down Expand Up @@ -502,7 +503,7 @@ async def notify_device_update(
# specify the user ID too since the user should always get their own device list
# updates, even if they aren't in any rooms.
self.notifier.on_new_event(
"device_list_key", position, users={user_id}, rooms=room_ids
StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids
)

# We may need to do some processing asynchronously for local user IDs.
Expand All @@ -523,7 +524,9 @@ async def notify_user_signature_update(
from_user_id, user_ids
)

self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
self.notifier.on_new_event(
StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
)

async def user_left_room(self, user: UserID, room_id: str) -> None:
user_id = user.to_string()
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
set_tag,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -151,7 +151,7 @@ async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", last_stream_id, users=local_messages.keys()
StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
)

async def _check_for_unknown_devices(
Expand Down Expand Up @@ -285,7 +285,7 @@ async def send_device_message(
# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", last_stream_id, users=local_messages.keys()
StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys()
)

if self.federation_sender:
Expand Down
13 changes: 8 additions & 5 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Requester,
RoomStreamToken,
StateMap,
StreamKeyType,
StreamToken,
UserID,
)
Expand Down Expand Up @@ -220,8 +221,10 @@ async def handle_room(event: RoomsForUser) -> None:
self.storage, user_id, messages
)

start_token = now_token.copy_and_replace("room_key", token)
end_token = now_token.copy_and_replace("room_key", room_end_token)
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
end_token = now_token.copy_and_replace(
StreamKeyType.ROOM, room_end_token
)
time_now = self.clock.time_msec()

d["messages"] = {
Expand Down Expand Up @@ -369,8 +372,8 @@ async def _room_initial_sync_parted(
self.storage, user_id, messages, is_peeking=is_peeking
)

start_token = StreamToken.START.copy_and_replace("room_key", token)
end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token)

time_now = self.clock.time_msec()

Expand Down Expand Up @@ -472,7 +475,7 @@ async def get_receipts() -> List[JsonDict]:
self.storage, user_id, messages, is_peeking=is_peeking
)

start_token = now_token.copy_and_replace("room_key", token)
start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
end_token = now_token

time_now = self.clock.time_msec()
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester
from synapse.types import JsonDict, Requester, StreamKeyType
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -491,7 +491,7 @@ async def get_messages(

if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
"room_key", leave_token
StreamKeyType.ROOM, leave_token
)

await self.hs.get_federation_handler().maybe_backfill(
Expand All @@ -513,7 +513,7 @@ async def get_messages(
event_filter=event_filter,
)

next_token = from_token.copy_and_replace("room_key", next_key)
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)

if events:
if event_filter:
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
from synapse.streams import EventSource
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -522,7 +522,7 @@ async def notify_from_replication(
room_ids_to_states, users_to_states = parties

self.notifier.on_new_event(
"presence_key",
StreamKeyType.PRESENCE,
stream_id,
rooms=room_ids_to_states.keys(),
users=users_to_states.keys(),
Expand Down Expand Up @@ -1137,7 +1137,7 @@ async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
room_ids_to_states, users_to_states = parties

self.notifier.on_new_event(
"presence_key",
StreamKeyType.PRESENCE,
stream_id,
rooms=room_ids_to_states.keys(),
users=[UserID.from_string(u) for u in users_to_states],
Expand Down
12 changes: 10 additions & 2 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes
from synapse.appservice import ApplicationService
from synapse.streams import EventSource
from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
from synapse.types import (
JsonDict,
ReadReceipt,
StreamKeyType,
UserID,
get_domain_from_id,
)

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -129,7 +135,9 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:

affected_room_ids = list({r.room_id for r in receipts})

self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
self.notifier.on_new_event(
StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
await self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
Expand Down
9 changes: 5 additions & 4 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
RoomID,
RoomStreamToken,
StateMap,
StreamKeyType,
StreamToken,
UserID,
create_requester,
Expand Down Expand Up @@ -1237,10 +1238,10 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]:
events_after=events_after,
state=await filter_evts(state_events),
aggregations=aggregations,
start=await token.copy_and_replace("room_key", results.start).to_string(
self.store
),
end=await token.copy_and_replace("room_key", results.end).to_string(
start=await token.copy_and_replace(
StreamKeyType.ROOM, results.start
).to_string(self.store),
end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string(
self.store
),
)
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, UserID
from synapse.types import JsonDict, StreamKeyType, UserID
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
Expand Down Expand Up @@ -655,11 +655,11 @@ async def _calculate_event_contexts(
"events_before": events_before,
"events_after": events_after,
"start": await now_token.copy_and_replace(
"room_key", res.start
StreamKeyType.ROOM, res.start
).to_string(self.store),
"end": await now_token.copy_and_replace(
StreamKeyType.ROOM, res.end
).to_string(self.store),
"end": await now_token.copy_and_replace("room_key", res.end).to_string(
self.store
),
}

if include_profile:
Expand Down
Loading