From 453f9819e97ee1ed9061ce6485bdd548af884c2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Apr 2021 15:01:32 +0100 Subject: [PATCH 1/7] Always use send_presence_to_destinations rather than send_presence This a) reduces the API surface and b) means that we calculate where to send presence on the presence writer, rather than federation senders. --- synapse/federation/send_queue.py | 70 +------------------ synapse/federation/sender/__init__.py | 96 +-------------------------- synapse/handlers/presence.py | 16 +++-- synapse/module_api/__init__.py | 21 ++++-- 4 files changed, 32 insertions(+), 171 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index e3f0bc2471f4..d71f04e43e41 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -76,9 +76,6 @@ def __init__(self, hs: "HomeServer"): # Pending presence map user_id -> UserPresenceState self.presence_map = {} # type: Dict[str, UserPresenceState] - # Stream position -> list[user_id] - self.presence_changed = SortedDict() # type: SortedDict[int, List[str]] - # Stores the destinations we need to explicitly send presence to about a # given user. # Stream position -> (user_id, destinations) @@ -96,7 +93,7 @@ def __init__(self, hs: "HomeServer"): self.edus = SortedDict() # type: SortedDict[int, Edu] - # stream ID for the next entry into presence_changed/keyed_edu_changed/edus. + # stream ID for the next entry into keyed_edu_changed/edus. self.pos = 1 # map from stream ID to the time that stream entry was generated, so that we @@ -117,7 +114,6 @@ def register(name: str, queue: Sized) -> None: for queue_name in [ "presence_map", - "presence_changed", "keyed_edu", "keyed_edu_changed", "edus", @@ -155,23 +151,12 @@ def _clear_queue_before_pos(self, position_to_delete: int) -> None: """Clear all the queues from before a given position""" with Measure(self.clock, "send_queue._clear"): # Delete things out of presence maps - keys = self.presence_changed.keys() - i = self.presence_changed.bisect_left(position_to_delete) - for key in keys[:i]: - del self.presence_changed[key] - - user_ids = { - user_id for uids in self.presence_changed.values() for user_id in uids - } - keys = self.presence_destinations.keys() i = self.presence_destinations.bisect_left(position_to_delete) for key in keys[:i]: del self.presence_destinations[key] - user_ids.update( - user_id for user_id, _ in self.presence_destinations.values() - ) + user_ids = {user_id for user_id, _ in self.presence_destinations.values()} to_del = [ user_id for user_id in self.presence_map if user_id not in user_ids @@ -244,23 +229,6 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: """ # nothing to do here: the replication listener will handle it. - def send_presence(self, states: List[UserPresenceState]) -> None: - """As per FederationSender - - Args: - states - """ - pos = self._next_pos() - - # We only want to send presence for our own users, so lets always just - # filter here just in case. - local_states = [s for s in states if self.is_mine_id(s.user_id)] - - self.presence_map.update({state.user_id: state for state in local_states}) - self.presence_changed[pos] = [state.user_id for state in local_states] - - self.notifier.on_new_replication_data() - def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: @@ -325,18 +293,6 @@ async def get_replication_rows( # of the federation stream. rows = [] # type: List[Tuple[int, BaseFederationRow]] - # Fetch changed presence - i = self.presence_changed.bisect_right(from_token) - j = self.presence_changed.bisect_right(to_token) + 1 - dest_user_ids = [ - (pos, user_id) - for pos, user_id_list in self.presence_changed.items()[i:j] - for user_id in user_id_list - ] - - for (key, user_id) in dest_user_ids: - rows.append((key, PresenceRow(state=self.presence_map[user_id]))) - # Fetch presence to send to destinations i = self.presence_destinations.bisect_right(from_token) j = self.presence_destinations.bisect_right(to_token) + 1 @@ -427,22 +383,6 @@ def add_to_buffer(self, buff): raise NotImplementedError() -class PresenceRow( - BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState -): - TypeId = "p" - - @staticmethod - def from_data(data): - return PresenceRow(state=UserPresenceState.from_dict(data)) - - def to_data(self): - return self.state.as_dict() - - def add_to_buffer(self, buff): - buff.presence.append(self.state) - - class PresenceDestinationsRow( BaseFederationRow, namedtuple( @@ -506,7 +446,6 @@ def add_to_buffer(self, buff): _rowtypes = ( - PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, @@ -518,7 +457,6 @@ def add_to_buffer(self, buff): ParsedFederationStreamData = namedtuple( "ParsedFederationStreamData", ( - "presence", # list(UserPresenceState) "presence_destinations", # list of tuples of UserPresenceState and destinations "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] @@ -543,7 +481,6 @@ def process_rows_for_federation( # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence=[], presence_destinations=[], keyed_edus={}, edus={}, @@ -559,9 +496,6 @@ def process_rows_for_federation( parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - if buff.presence: - transaction_queue.send_presence(buff.presence) - for state, destinations in buff.presence_destinations: transaction_queue.send_presence_to_destinations( states=[state], destinations=destinations diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 952ad39f8c2e..6266accaf533 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -24,8 +24,6 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu -from synapse.handlers.presence import get_interested_remotes -from synapse.logging.context import preserve_fn from synapse.metrics import ( LaterGauge, event_processing_loop_counter, @@ -34,7 +32,7 @@ ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken -from synapse.util.metrics import Measure, measure_func +from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.events.presence_router import PresenceRouter @@ -79,15 +77,6 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: """ raise NotImplementedError() - @abc.abstractmethod - def send_presence(self, states: List[UserPresenceState]) -> None: - """Send the new presence states to the appropriate destinations. - - This actually queues up the presence states ready for sending and - triggers a background task to process them and send out the transactions. - """ - raise NotImplementedError() - @abc.abstractmethod def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] @@ -176,11 +165,6 @@ def __init__(self, hs: "HomeServer"): ), ) - # Map of user_id -> UserPresenceState for all the pending presence - # to be sent out by user_id. Entries here get processed and put in - # pending_presence_by_dest - self.pending_presence = {} # type: Dict[str, UserPresenceState] - LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -201,8 +185,6 @@ def __init__(self, hs: "HomeServer"): self._is_processing = False self._last_poked_id = -1 - self._processing_pending_presence = False - # map from room_id to a set of PerDestinationQueues which we believe are # awaiting a call to flush_read_receipts_for_room. The presence of an entry # here for a given room means that we are rate-limiting RR flushes to that room, @@ -546,48 +528,6 @@ def _flush_rrs_for_room(self, room_id: str) -> None: for queue in queues: queue.flush_read_receipts_for_room(room_id) - @preserve_fn # the caller should not yield on this - async def send_presence(self, states: List[UserPresenceState]) -> None: - """Send the new presence states to the appropriate destinations. - - This actually queues up the presence states ready for sending and - triggers a background task to process them and send out the transactions. - """ - if not self.hs.config.use_presence: - # No-op if presence is disabled. - return - - # First we queue up the new presence by user ID, so multiple presence - # updates in quick succession are correctly handled. - # We only want to send presence for our own users, so lets always just - # filter here just in case. - self.pending_presence.update( - {state.user_id: state for state in states if self.is_mine_id(state.user_id)} - ) - - # We then handle the new pending presence in batches, first figuring - # out the destinations we need to send each state to and then poking it - # to attempt a new transaction. We linearize this so that we don't - # accidentally mess up the ordering and send multiple presence updates - # in the wrong order - if self._processing_pending_presence: - return - - self._processing_pending_presence = True - try: - while True: - states_map = self.pending_presence - self.pending_presence = {} - - if not states_map: - break - - await self._process_presence_inner(list(states_map.values())) - except Exception: - logger.exception("Error sending presence states to servers") - finally: - self._processing_pending_presence = False - def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: @@ -608,40 +548,6 @@ def send_presence_to_destinations( continue self._get_per_destination_queue(destination).send_presence(states) - @measure_func("txnqueue._process_presence") - async def _process_presence_inner(self, states: List[UserPresenceState]) -> None: - """Given a list of states populate self.pending_presence_by_dest and - poke to send a new transaction to each destination - """ - # We pull the presence router here instead of __init__ - # to prevent a dependency cycle: - # - # AuthHandler -> Notifier -> FederationSender - # -> PresenceRouter -> ModuleApi -> AuthHandler - if self._presence_router is None: - self._presence_router = self.hs.get_presence_router() - - assert self._presence_router is not None - - hosts_and_states = await get_interested_remotes( - self.store, - self._presence_router, - states, - self.state, - ) - - for destinations, states in hosts_and_states: - for destination in destinations: - if destination == self.server_name: - continue - - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - continue - - self._get_per_destination_queue(destination).send_presence(states) - def build_and_send_edu( self, destination: str, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e120dd1f4860..a0cc77986908 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -680,7 +680,7 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: if to_federation_ping: federation_presence_out_counter.inc(len(to_federation_ping)) - self._push_to_remotes(to_federation_ping.values()) + await self._push_to_remotes(to_federation_ping.values()) async def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -920,15 +920,23 @@ async def _persist_and_notify(self, states): users=[UserID.from_string(u) for u in users_to_states], ) - self._push_to_remotes(states) + await self._push_to_remotes(states) - def _push_to_remotes(self, states): + async def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: states (list(UserPresenceState)) """ - self.federation.send_presence(states) + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + states, + self.state, + ) + + for destinations, states in hosts_and_states: + self.federation.send_presence_to_destinations(states, destinations) async def incoming_presence(self, origin, content): """Called when we receive a `m.presence` EDU from a remote server.""" diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index b7dbbfc27c18..9b1030e26f7f 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.events import EventBase +from synapse.handlers.presence import get_interested_remotes from synapse.http.client import SimpleHttpClient from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -50,6 +51,11 @@ def __init__(self, hs, auth_handler): self._auth_handler = auth_handler self._server_name = hs.hostname self._presence_stream = hs.get_event_sources().sources["presence"] + self._state = hs.get_state_handler() + + self._federation = None + if hs.should_send_federation(): + self._federation = self._hs.get_federation_sender() # We expose these as properties below in order to attach a helpful docstring. self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient @@ -423,6 +429,9 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: # Force a presence initial_sync for this user next time self._send_full_presence_to_local_users.add(user) else: + if not self._federation: + continue + # Retrieve presence state for currently online users that this user # is considered interested in presence_events, _ = await self._presence_stream.get_new_events( @@ -430,12 +439,16 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: ) # Send to remote destinations - await make_deferred_yieldable( - # We pull the federation sender here as we can only do so on workers - # that support sending presence - self._hs.get_federation_sender().send_presence(presence_events) + hosts_and_states = await get_interested_remotes( + self._store, + self._hs.get_presence_router(), + presence_events, + self._state, ) + for destinations, states in hosts_and_states: + self._federation.send_presence_to_destinations(states, destinations) + class PublicRoomListManager: """Contains methods for adding to, removing from and querying whether a room From 3210487019b6ade29db9bfdadd4cda3b8dde5bdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Apr 2021 11:41:52 +0100 Subject: [PATCH 2/7] Use the presence stream to notify federation sender --- synapse/handlers/presence.py | 44 ++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a0cc77986908..a56dcfd65641 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -123,6 +123,7 @@ class BasePresenceHandler(abc.ABC): def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastore() + self.state = hs.get_state_handler() self._busy_presence_enabled = hs.config.experimental.msc3026_enabled @@ -263,6 +264,10 @@ def __init__(self, hs): self.hs = hs self.is_mine_id = hs.is_mine_id + self._federation = None + if hs.should_send_federation(): + self._federation = hs.get_federation_sender() + self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence @@ -388,6 +393,20 @@ async def notify_from_replication(self, states, stream_id): users=users_to_states.keys(), ) + # If this is a federation sender, notify about presence updates. + if not self._federation: + return + + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + states, + self.state, + ) + + for destinations, states in hosts_and_states: + self._federation.send_presence_to_destinations(states, destinations) + async def process_replication_rows(self, token, rows): states = [ UserPresenceState( @@ -464,10 +483,11 @@ def __init__(self, hs: "HomeServer"): self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() - self.state = hs.get_state_handler() self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence + self._send_federation = hs.should_send_federation() + federation_registry = hs.get_federation_registry() federation_registry.register_edu_handler("m.presence", self.incoming_presence) @@ -680,7 +700,15 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: if to_federation_ping: federation_presence_out_counter.inc(len(to_federation_ping)) - await self._push_to_remotes(to_federation_ping.values()) + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + list(to_federation_ping.values()), + self.state, + ) + + for destinations, states in hosts_and_states: + self.federation.send_presence_to_destinations(states, destinations) async def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -920,14 +948,12 @@ async def _persist_and_notify(self, states): users=[UserID.from_string(u) for u in users_to_states], ) - await self._push_to_remotes(states) - - async def _push_to_remotes(self, states): - """Sends state updates to remote servers. + # We only want to poke the local federation sender, if any, as other + # workers will receive the presence updates via the presence replication + # stream. + if not self._send_federation: + return - Args: - states (list(UserPresenceState)) - """ hosts_and_states = await get_interested_remotes( self.store, self.presence_router, From 390b500c7b186cdbe2aceef3869ba21d2c1f564b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Apr 2021 11:00:54 +0100 Subject: [PATCH 3/7] Add a maybe_send_presence_to_interested_destinations helper function --- synapse/handlers/presence.py | 59 ++++++++++++++++------------------ synapse/module_api/__init__.py | 24 ++++---------- 2 files changed, 34 insertions(+), 49 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a56dcfd65641..9fa5b93d8e6b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -123,8 +123,13 @@ class BasePresenceHandler(abc.ABC): def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastore() + self.presence_router = hs.get_presence_router() self.state = hs.get_state_handler() + self._federation = None + if hs.should_send_federation(): + self._federation = hs.get_federation_sender() + self._busy_presence_enabled = hs.config.experimental.msc3026_enabled active_presence = self.store.take_presence_startup_info() @@ -250,6 +255,26 @@ async def process_replication_rows(self, token, rows): """Process presence stream rows received over replication.""" pass + async def maybe_send_presence_to_interested_destinations( + self, states: List[UserPresenceState] + ): + """If this instance is a federation sender, send the states to all + destinations that are interested. + """ + + if not self._federation: + return + + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + states, + self.state, + ) + + for destinations, states in hosts_and_states: + self._federation.send_presence_to_destinations(states, destinations) + class _NullContextManager(ContextManager[None]): """A context manager which does nothing.""" @@ -264,11 +289,6 @@ def __init__(self, hs): self.hs = hs self.is_mine_id = hs.is_mine_id - self._federation = None - if hs.should_send_federation(): - self._federation = hs.get_federation_sender() - - self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence # The number of ongoing syncs on this process, by user id. @@ -394,18 +414,7 @@ async def notify_from_replication(self, states, stream_id): ) # If this is a federation sender, notify about presence updates. - if not self._federation: - return - - hosts_and_states = await get_interested_remotes( - self.store, - self.presence_router, - states, - self.state, - ) - - for destinations, states in hosts_and_states: - self._federation.send_presence_to_destinations(states, destinations) + await self.maybe_send_presence_to_interested_destinations(states) async def process_replication_rows(self, token, rows): states = [ @@ -483,11 +492,8 @@ def __init__(self, hs: "HomeServer"): self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() - self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence - self._send_federation = hs.should_send_federation() - federation_registry = hs.get_federation_registry() federation_registry.register_edu_handler("m.presence", self.incoming_presence) @@ -951,18 +957,7 @@ async def _persist_and_notify(self, states): # We only want to poke the local federation sender, if any, as other # workers will receive the presence updates via the presence replication # stream. - if not self._send_federation: - return - - hosts_and_states = await get_interested_remotes( - self.store, - self.presence_router, - states, - self.state, - ) - - for destinations, states in hosts_and_states: - self.federation.send_presence_to_destinations(states, destinations) + await self.maybe_send_presence_to_interested_destinations(states) async def incoming_presence(self, origin, content): """Called when we receive a `m.presence` EDU from a remote server.""" diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 9b1030e26f7f..a1a2b9aeccd3 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -18,7 +18,6 @@ from twisted.internet import defer from synapse.events import EventBase -from synapse.handlers.presence import get_interested_remotes from synapse.http.client import SimpleHttpClient from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -53,10 +52,6 @@ def __init__(self, hs, auth_handler): self._presence_stream = hs.get_event_sources().sources["presence"] self._state = hs.get_state_handler() - self._federation = None - if hs.should_send_federation(): - self._federation = self._hs.get_federation_sender() - # We expose these as properties below in order to attach a helpful docstring. self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient self._public_room_list_manager = PublicRoomListManager(hs) @@ -429,25 +424,20 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: # Force a presence initial_sync for this user next time self._send_full_presence_to_local_users.add(user) else: - if not self._federation: - continue - # Retrieve presence state for currently online users that this user # is considered interested in presence_events, _ = await self._presence_stream.get_new_events( UserID.from_string(user), from_key=None, include_offline=False ) - # Send to remote destinations - hosts_and_states = await get_interested_remotes( - self._store, - self._hs.get_presence_router(), - presence_events, - self._state, - ) + # Send to remote destinations. - for destinations, states in hosts_and_states: - self._federation.send_presence_to_destinations(states, destinations) + # We pull out the presence handler here to break a cyclic + # dependency between the presence router and module API. + presence_handler = self._hs.get_presence_handler() + await presence_handler.maybe_send_presence_to_interested_destinations( + presence_events + ) class PublicRoomListManager: From 10fdce3c3770316ab6f9899696c2bdd9e9b0ee13 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Apr 2021 12:04:57 +0100 Subject: [PATCH 4/7] Newsfile --- changelog.d/9828.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9828.feature diff --git a/changelog.d/9828.feature b/changelog.d/9828.feature new file mode 100644 index 000000000000..f56b0bb3bdeb --- /dev/null +++ b/changelog.d/9828.feature @@ -0,0 +1 @@ +Add experimental support for handling presence on a worker. From af42262e8fbc774975396cc113aede8867c2a885 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Apr 2021 16:59:28 +0100 Subject: [PATCH 5/7] Remove confusing duplicate `self.federation` var --- synapse/handlers/presence.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9fa5b93d8e6b..c705617d0301 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -127,9 +127,11 @@ def __init__(self, hs: "HomeServer"): self.state = hs.get_state_handler() self._federation = None - if hs.should_send_federation(): + if hs.should_send_federation() or not hs.config.worker_app: self._federation = hs.get_federation_sender() + self._send_federation = hs.should_send_federation() + self._busy_presence_enabled = hs.config.experimental.msc3026_enabled active_presence = self.store.take_presence_startup_info() @@ -262,9 +264,12 @@ async def maybe_send_presence_to_interested_destinations( destinations that are interested. """ - if not self._federation: + if not self._send_federation: return + # If this worker sends federation we must have a FederationSender. + assert self._federation + hosts_and_states = await get_interested_remotes( self.store, self.presence_router, @@ -491,7 +496,6 @@ def __init__(self, hs: "HomeServer"): self.server_name = hs.hostname self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.federation = hs.get_federation_sender() self._presence_enabled = hs.config.use_presence federation_registry = hs.get_federation_registry() @@ -713,8 +717,12 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: self.state, ) + # Since this is master we know that we have a federation sender or + # queue, and so this will be defined. + assert self._federation + for destinations, states in hosts_and_states: - self.federation.send_presence_to_destinations(states, destinations) + self._federation.send_presence_to_destinations(states, destinations) async def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -1193,9 +1201,13 @@ async def _handle_state_delta(self, deltas): user_presence_states ) + # Since this is master we know that we have a federation sender or + # queue, and so this will be defined. + assert self._federation + # Send out user presence updates for each destination for destination, user_state_set in presence_destinations.items(): - self.federation.send_presence_to_destinations( + self._federation.send_presence_to_destinations( destinations=[destination], states=user_state_set ) From 38ad4820e55c59fe99eac4a939ba2bb779e810b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Apr 2021 17:03:36 +0100 Subject: [PATCH 6/7] Add a comment about federation ping --- synapse/handlers/presence.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c705617d0301..44e1a66999ca 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -702,6 +702,13 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: self.unpersisted_users_changes |= {s.user_id for s in new_states} self.unpersisted_users_changes -= set(to_notify.keys()) + # Check if we need to resend any presence states to remote hosts. We + # only do this for states that haven't been updated in a while to + # ensure that the remote host doesn't time the presence state out. + # + # Note that since these are states that have *not* been updated, + # they won't get sent down the normal presence replication stream, + # and so we have to explicitly send them via the federation stream. to_federation_ping = { user_id: state for user_id, state in to_federation_ping.items() From d5f7ca7d97685e46a471f96e48bc520a69a03420 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Apr 2021 10:24:06 +0100 Subject: [PATCH 7/7] Update comment Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 44e1a66999ca..6460eb99521c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -971,7 +971,7 @@ async def _persist_and_notify(self, states): # We only want to poke the local federation sender, if any, as other # workers will receive the presence updates via the presence replication - # stream. + # stream (which is updated by `store.update_presence`). await self.maybe_send_presence_to_interested_destinations(states) async def incoming_presence(self, origin, content):