Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Be smarter about which hosts to send presence to when processing room joins #9402

Merged
merged 6 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9402.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Avoid sending unnecessary presence updates when joining a room.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ async def send_presence(self, states: List[UserPresenceState]):
self._processing_pending_presence = False

def send_presence_to_destinations(
self, states: List[UserPresenceState], destinations: List[str]
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
destinations (list[str])
Expand Down
56 changes: 42 additions & 14 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,9 @@ async def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
"""
# A map of destination to a set of user state that they should receive
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]

for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
Expand All @@ -858,6 +861,7 @@ async def _handle_state_delta(self, deltas):

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

# Drop any event that isn't a membership join
if typ != EventTypes.Member:
continue

Expand All @@ -880,29 +884,54 @@ async def _handle_state_delta(self, deltas):
# Ignore changes to join events.
continue

await self._on_user_joined_room(room_id, state_key)
# Retrieve any user presence state updates that need to be sent as a result,
# and the destinations that need to receive it
destinations, user_presence_states = await self._on_user_joined_room(
room_id, state_key
)

# Insert the destinations and respective updates into our destinations dict
for destination in destinations:
presence_destinations.setdefault(destination, set()).update(
user_presence_states
)

# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self.federation.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)

async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
async def _on_user_joined_room(
self, room_id: str, user_id: str
) -> Tuple[List[str], List[UserPresenceState]]:
"""Called when we detect a user joining the room via the current state
delta stream.
"""
delta stream. Returns the destinations that need to be updated and the
presence updates to send to them.

Args:
room_id: The ID of the room that the user has joined.
user_id: The ID of the user that has joined the room.

Returns:
A tuple of (list of destinations, list of presence updates).
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
"""
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

state = await self.current_state_for_user(user_id)
hosts = await self.state.get_current_hosts_in_room(room_id)
remote_hosts = await self.state.get_current_hosts_in_room(room_id)

# Filter out ourselves.
hosts = {host for host in hosts if host != self.server_name}
filtered_remote_hosts = [
host for host in remote_hosts if host != self.server_name
]

self.federation.send_presence_to_destinations(
states=[state], destinations=hosts
)
state = await self.current_state_for_user(user_id)
return filtered_remote_hosts, [state]
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
Expand All @@ -915,6 +944,8 @@ async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
# TODO: Check that this is actually a new server joining the
# room.

remote_host = get_domain_from_id(user_id)

users = await self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))

Expand All @@ -934,10 +965,7 @@ async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
or state.status_msg is not None
]

if states:
self.federation.send_presence_to_destinations(
states=states, destinations=[get_domain_from_id(user_id)]
)
return [remote_host], states


def should_notify(old_state, new_state):
Expand Down
14 changes: 10 additions & 4 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def test_remote_joins(self):
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server2"], states=[expected_state]
destinations=["server2"], states={expected_state}
)

#
Expand All @@ -533,7 +533,7 @@ def test_remote_joins(self):

self.federation_sender.send_presence.assert_not_called()
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server3"], states=[expected_state]
destinations=["server3"], states={expected_state}
)

def test_remote_gets_presence_when_local_user_joins(self):
Expand Down Expand Up @@ -584,8 +584,14 @@ def test_remote_gets_presence_when_local_user_joins(self):
self.presence_handler.current_state_for_user("@test2:server")
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations={"server2", "server3"}, states=[expected_state]
self.assertEqual(
self.federation_sender.send_presence_to_destinations.call_count, 2
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server3"], states={expected_state}
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server2"], states={expected_state}
)

def _add_new_user(self, room_id, user_id):
Expand Down