Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix providing a RoomStreamToken instance to _notify_app_services_ephemeral #11137

Merged
merged 9 commits into from
Nov 2, 2021
1 change: 1 addition & 0 deletions changelog.d/11137.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code.
22 changes: 17 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Optional[int],
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""
Expand All @@ -203,7 +203,7 @@ def notify_interested_services_ephemeral(
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.

new_token: The latest stream token.
new_token: The stream token of the event.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
Expand All @@ -212,6 +212,19 @@ def notify_interested_services_ephemeral(
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return

# Assert that new_token is an integer (and not a RoomStreamToken).
# All of the supported streams that this function handles use an
# integer to track progress (rather than a RoomStreamToken - a
# vector clock implementation) as they don't support multiple
# stream writers.
#
# As a result, we simply assert that new_token is an integer.
# If we do end up needing to pass a RoomStreamToken down here
# in the future, using RoomStreamToken.stream (the minimum stream
# position) to convert to an ascending integer value should work.
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)

services = [
service
for service in self.store.get_app_services()
Expand All @@ -231,14 +244,13 @@ async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
stream_key: str,
new_token: Optional[int],
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
# Only handle typing if we have the latest token
richvdh marked this conversation as resolved.
Show resolved Hide resolved
if stream_key == "typing_key" and new_token is not None:
if stream_key == "typing_key":
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
Expand Down
38 changes: 9 additions & 29 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,29 +383,6 @@ def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
except Exception:
logger.exception("Error notifying application services of event")

def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""Notify application services of ephemeral event activity.

Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event, if any.
"""
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users or []
)
except Exception:
logger.exception("Error notifying application services of event")

def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
self._pusher_pool.on_new_notifications(max_room_stream_token)
Expand Down Expand Up @@ -467,12 +444,15 @@ def on_new_event(

self.notify_replication()

# Notify appservices
self._notify_app_services_ephemeral(
stream_key,
new_token,
users,
)
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")

def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ async def add_user_signature_change_to_streams(
user_ids: the users who were signed

Returns:
THe new stream ID.
The new stream ID.
"""

async with self._device_list_id_gen.get_next() as stream_id:
Expand Down Expand Up @@ -1315,7 +1315,7 @@ def _update_remote_device_list_cache_txn(

async def add_device_change_to_streams(
self, user_id: str, device_ids: Collection[str], hosts: List[str]
):
) -> int:
"""Persist that a user's devices have been updated, and which hosts
(if any) should be poked.
"""
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(
prefilled_cache=presence_cache_prefill,
)

async def update_presence(self, presence_states):
async def update_presence(self, presence_states) -> Tuple[int, int]:
assert self._can_persist_presence

stream_ordering_manager = self._presence_id_gen.get_next_mult(
Expand Down