Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Allow multiple workers to write to receipts stream. (#16432)
Browse files Browse the repository at this point in the history
Fixes #16417
  • Loading branch information
erikjohnston authored Oct 25, 2023
1 parent e182dbb commit ba47fea
Show file tree
Hide file tree
Showing 15 changed files with 604 additions and 89 deletions.
1 change: 1 addition & 0 deletions changelog.d/16432.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow multiple workers to write to receipts stream.
4 changes: 2 additions & 2 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"Must only specify one instance to handle `account_data` messages."
)

if len(self.writers.receipts) != 1:
if len(self.writers.receipts) == 0:
raise ConfigError(
"Must only specify one instance to handle `receipts` messages."
"Must specify at least one instance to handle `receipts` messages."
)

if len(self.writers.events) == 0:
Expand Down
42 changes: 23 additions & 19 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
DeviceListUpdates,
JsonDict,
JsonMapping,
MultiWriterStreamToken,
RoomAlias,
RoomStreamToken,
StreamKeyType,
Expand Down Expand Up @@ -217,7 +218,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
def notify_interested_services_ephemeral(
self,
stream_key: StreamKeyType,
new_token: Union[int, RoomStreamToken],
new_token: Union[int, RoomStreamToken, MultiWriterStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
"""
Expand Down Expand Up @@ -259,19 +260,6 @@ def notify_interested_services_ephemeral(
):
return

# Assert that new_token is an integer (and not a RoomStreamToken).
# All of the supported streams that this function handles use an
# integer to track progress (rather than a RoomStreamToken - a
# vector clock implementation) as they don't support multiple
# stream writers.
#
# As a result, we simply assert that new_token is an integer.
# If we do end up needing to pass a RoomStreamToken down here
# in the future, using RoomStreamToken.stream (the minimum stream
# position) to convert to an ascending integer value should work.
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)

# Ignore to-device messages if the feature flag is not enabled
if (
stream_key == StreamKeyType.TO_DEVICE
Expand All @@ -286,6 +274,9 @@ def notify_interested_services_ephemeral(
):
return

# We know we're not a `RoomStreamToken` at this point.
assert not isinstance(new_token, RoomStreamToken)

# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
Expand Down Expand Up @@ -327,7 +318,7 @@ async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
stream_key: StreamKeyType,
new_token: int,
new_token: Union[int, MultiWriterStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s", stream_key)
Expand All @@ -340,6 +331,7 @@ async def _notify_interested_services_ephemeral(
#
# Instead we simply grab the latest typing updates in _handle_typing
# and, if they apply to this application service, send it off.
assert isinstance(new_token, int)
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
Expand All @@ -350,15 +342,23 @@ async def _notify_interested_services_ephemeral(
(service.id, stream_key)
):
if stream_key == StreamKeyType.RECEIPT:
assert isinstance(new_token, MultiWriterStreamToken)

# We store appservice tokens as integers, so we ignore
# the `instance_map` components and instead simply
# follow the base stream position.
new_token = MultiWriterStreamToken(stream=new_token.stream)

events = await self._handle_receipts(service, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)

# Persist the latest handled stream token for this appservice
await self.store.set_appservice_stream_type_pos(
service, "read_receipt", new_token
service, "read_receipt", new_token.stream
)

elif stream_key == StreamKeyType.PRESENCE:
assert isinstance(new_token, int)
events = await self._handle_presence(service, users, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)

Expand All @@ -368,6 +368,7 @@ async def _notify_interested_services_ephemeral(
)

elif stream_key == StreamKeyType.TO_DEVICE:
assert isinstance(new_token, int)
# Retrieve a list of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
to_device_messages = await self._get_to_device_messages(
Expand All @@ -383,6 +384,7 @@ async def _notify_interested_services_ephemeral(
)

elif stream_key == StreamKeyType.DEVICE_LIST:
assert isinstance(new_token, int)
device_list_summary = await self._get_device_list_summary(
service, new_token
)
Expand Down Expand Up @@ -432,7 +434,7 @@ async def _handle_typing(
return typing

async def _handle_receipts(
self, service: ApplicationService, new_token: int
self, service: ApplicationService, new_token: MultiWriterStreamToken
) -> List[JsonMapping]:
"""
Return the latest read receipts that the given application service should receive.
Expand All @@ -455,15 +457,17 @@ async def _handle_receipts(
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
if new_token is not None and new_token <= from_key:
if new_token is not None and new_token.stream <= from_key:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
return []

from_token = MultiWriterStreamToken(stream=from_key)

receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key, to_key=new_token
service=service, from_key=from_token, to_key=new_token
)
return receipts

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def _snapshot_all_rooms(
joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN]
receipt = await self.store.get_linearized_receipts_for_rooms(
joined_rooms,
to_key=int(now_token.receipt_key),
to_key=now_token.receipt_key,
)

receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
Expand Down
19 changes: 10 additions & 9 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from synapse.types import (
JsonDict,
JsonMapping,
MultiWriterStreamToken,
ReadReceipt,
StreamKeyType,
UserID,
Expand Down Expand Up @@ -200,7 +201,7 @@ async def received_client_receipt(
await self.federation_sender.send_read_receipt(receipt)


class ReceiptEventSource(EventSource[int, JsonMapping]):
class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.config = hs.config
Expand Down Expand Up @@ -273,13 +274,12 @@ def filter_out_private_receipts(
async def get_new_events(
self,
user: UserID,
from_key: int,
from_key: MultiWriterStreamToken,
limit: int,
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[JsonMapping], int]:
from_key = int(from_key)
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
to_key = self.get_current_key()

if from_key == to_key:
Expand All @@ -296,8 +296,11 @@ async def get_new_events(
return events, to_key

async def get_new_events_as(
self, from_key: int, to_key: int, service: ApplicationService
) -> Tuple[List[JsonMapping], int]:
self,
from_key: MultiWriterStreamToken,
to_key: MultiWriterStreamToken,
service: ApplicationService,
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
"""Returns a set of new read receipt events that an appservice
may be interested in.
Expand All @@ -312,8 +315,6 @@ async def get_new_events_as(
appservice may be interested in.
* The current read receipt stream token.
"""
from_key = int(from_key)

if from_key == to_key:
return [], to_key

Expand All @@ -333,5 +334,5 @@ async def get_new_events_as(

return events, to_key

def get_current_key(self) -> int:
def get_current_key(self) -> MultiWriterStreamToken:
return self.store.get_max_receipt_stream_id()
7 changes: 6 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
DeviceListUpdates,
JsonDict,
JsonMapping,
MultiWriterStreamToken,
MutableStateMap,
Requester,
RoomStreamToken,
Expand Down Expand Up @@ -477,7 +478,11 @@ async def ephemeral_by_room(
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)

receipt_key = since_token.receipt_key if since_token else 0
receipt_key = (
since_token.receipt_key
if since_token
else MultiWriterStreamToken(stream=0)
)

receipt_source = self.event_sources.sources.receipt
receipts, receipt_key = await receipt_source.get_new_events(
Expand Down
45 changes: 43 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
Dict,
Iterable,
List,
Literal,
Optional,
Set,
Tuple,
TypeVar,
Union,
overload,
)

import attr
Expand All @@ -44,6 +46,7 @@
from synapse.streams.config import PaginationConfig
from synapse.types import (
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
RoomStreamToken,
StrCollection,
Expand Down Expand Up @@ -127,7 +130,7 @@ def __init__(
def notify(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken],
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
Expand Down Expand Up @@ -452,10 +455,48 @@ def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken) -> None:
except Exception:
logger.exception("Error pusher pool of event")

@overload
def on_new_event(
self,
stream_key: Literal[StreamKeyType.ROOM],
new_token: RoomStreamToken,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...

@overload
def on_new_event(
self,
stream_key: Literal[StreamKeyType.RECEIPT],
new_token: MultiWriterStreamToken,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...

@overload
def on_new_event(
self,
stream_key: Literal[
StreamKeyType.ACCOUNT_DATA,
StreamKeyType.DEVICE_LIST,
StreamKeyType.PRESENCE,
StreamKeyType.PUSH_RULES,
StreamKeyType.TO_DEVICE,
StreamKeyType.TYPING,
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
],
new_token: int,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...

def on_new_event(
self,
stream_key: StreamKeyType,
new_token: Union[int, RoomStreamToken],
new_token: Union[int, RoomStreamToken, MultiWriterStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ async def on_rdata(
StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
)
elif stream_name == ReceiptsStream.NAME:
new_token = self.store.get_max_receipt_stream_id()
self.notifier.on_new_event(
StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
StreamKeyType.RECEIPT, new_token, rooms=[row.room_id for row in rows]
)
await self._pusher_pool.on_new_receipts({row.user_id for row in rows})
elif stream_name == ToDeviceStream.NAME:
Expand Down
Loading

0 comments on commit ba47fea

Please sign in to comment.