Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Be smarter about which hosts to send presence to when processing room…
Browse files Browse the repository at this point in the history
… joins (#9402)

This PR attempts to eliminate unnecessary presence sending work when your local server joins a room, or when a remote server joins a room your server is participating in by processing state deltas in chunks rather than individually.

---

When your server joins a room for the first time, it requests the historical state as well. This chunk of new state is passed to the presence handler which, after filtering that state down to only membership joins, will send presence updates to homeservers for each join processed.

It turns out that we were being a bit naive and processing each event individually, and sending out presence updates for every one of those joins. Even if many different joins were users on the same server (hello IRC bridges), we'd send presence to that same homeserver for every remote user join we saw.

This PR attempts to deduplicate all of that by processing the entire batch of state deltas at once, instead of only doing each join individually. We process the joins and note down which servers need which presence:

* If it was a local user join, send that user's latest presence to all servers in the room
* If it was a remote user join, send the presence for all local users in the room to that homeserver

We deduplicate by inserting all of those pending updates into a dictionary of the form:

```
{
  server_name1: {presence_update1, ...},
  server_name2: {presence_update1, presence_update2, ...}
}
```

Only after building this dict do we then start sending out presence updates.
  • Loading branch information
anoadragon453 authored Feb 19, 2021
1 parent 13e9029 commit 8bcfc2e
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 19 deletions.
1 change: 1 addition & 0 deletions changelog.d/9402.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where a lot of unnecessary presence updates were sent when joining a room.
2 changes: 1 addition & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ async def send_presence(self, states: List[UserPresenceState]):
self._processing_pending_presence = False

def send_presence_to_destinations(
self, states: List[UserPresenceState], destinations: List[str]
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
destinations (list[str])
Expand Down
56 changes: 42 additions & 14 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,9 @@ async def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
"""
# A map of destination to a set of user state that they should receive
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]

for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
Expand All @@ -858,6 +861,7 @@ async def _handle_state_delta(self, deltas):

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

# Drop any event that isn't a membership join
if typ != EventTypes.Member:
continue

Expand All @@ -880,29 +884,54 @@ async def _handle_state_delta(self, deltas):
# Ignore changes to join events.
continue

await self._on_user_joined_room(room_id, state_key)
# Retrieve any user presence state updates that need to be sent as a result,
# and the destinations that need to receive it
destinations, user_presence_states = await self._on_user_joined_room(
room_id, state_key
)

# Insert the destinations and respective updates into our destinations dict
for destination in destinations:
presence_destinations.setdefault(destination, set()).update(
user_presence_states
)

# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self.federation.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)

async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
async def _on_user_joined_room(
self, room_id: str, user_id: str
) -> Tuple[List[str], List[UserPresenceState]]:
"""Called when we detect a user joining the room via the current state
delta stream.
"""
delta stream. Returns the destinations that need to be updated and the
presence updates to send to them.
Args:
room_id: The ID of the room that the user has joined.
user_id: The ID of the user that has joined the room.
Returns:
A tuple of destinations and presence updates to send to them.
"""
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user

state = await self.current_state_for_user(user_id)
hosts = await self.state.get_current_hosts_in_room(room_id)
remote_hosts = await self.state.get_current_hosts_in_room(room_id)

# Filter out ourselves.
hosts = {host for host in hosts if host != self.server_name}
filtered_remote_hosts = [
host for host in remote_hosts if host != self.server_name
]

self.federation.send_presence_to_destinations(
states=[state], destinations=hosts
)
state = await self.current_state_for_user(user_id)
return filtered_remote_hosts, [state]
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
Expand All @@ -915,6 +944,8 @@ async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
# TODO: Check that this is actually a new server joining the
# room.

remote_host = get_domain_from_id(user_id)

users = await self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))

Expand All @@ -934,10 +965,7 @@ async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
or state.status_msg is not None
]

if states:
self.federation.send_presence_to_destinations(
states=states, destinations=[get_domain_from_id(user_id)]
)
return [remote_host], states


def should_notify(old_state, new_state):
Expand Down
14 changes: 10 additions & 4 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def test_remote_joins(self):
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server2"], states=[expected_state]
destinations=["server2"], states={expected_state}
)

#
Expand All @@ -533,7 +533,7 @@ def test_remote_joins(self):

self.federation_sender.send_presence.assert_not_called()
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server3"], states=[expected_state]
destinations=["server3"], states={expected_state}
)

def test_remote_gets_presence_when_local_user_joins(self):
Expand Down Expand Up @@ -584,8 +584,14 @@ def test_remote_gets_presence_when_local_user_joins(self):
self.presence_handler.current_state_for_user("@test2:server")
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations={"server2", "server3"}, states=[expected_state]
self.assertEqual(
self.federation_sender.send_presence_to_destinations.call_count, 2
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server3"], states={expected_state}
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server2"], states={expected_state}
)

def _add_new_user(self, room_id, user_id):
Expand Down

0 comments on commit 8bcfc2e

Please sign in to comment.