Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't wake up destination transaction queue if they're not due for retry. #16223

Merged
merged 12 commits into from
Sep 4, 2023
1 change: 1 addition & 0 deletions changelog.d/16223.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve resource usage when sending data to a large number of remote hosts that are marked as "down".
12 changes: 7 additions & 5 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.metrics import Measure

from .units import Edu
Expand Down Expand Up @@ -229,7 +229,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""
# nothing to do here: the replication listener will handle it.

def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender
Expand All @@ -245,7 +245,9 @@ def send_presence_to_destinations(

self.notifier.on_new_replication_data()

def send_device_messages(self, destination: str, immediate: bool = True) -> None:
async def send_device_messages(
self, destinations: StrCollection, immediate: bool = True
) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
Expand Down Expand Up @@ -463,7 +465,7 @@ class ParsedFederationStreamData:
edus: Dict[str, List[Edu]]


def process_rows_for_federation(
async def process_rows_for_federation(
transaction_queue: FederationSender,
rows: List[FederationStream.FederationStreamRow],
) -> None:
Expand Down Expand Up @@ -496,7 +498,7 @@ def process_rows_for_federation(
parsed_row.add_to_buffer(buff)

for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
await transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
)

Expand Down
86 changes: 61 additions & 25 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.per_destination_queue import (
CATCHUP_RETRY_INTERVAL,
PerDestinationQueue,
)
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand All @@ -161,9 +164,10 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter

if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
Expand Down Expand Up @@ -213,7 +217,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
raise NotImplementedError()

@abc.abstractmethod
def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
Expand Down Expand Up @@ -242,9 +246,11 @@ def build_and_send_edu(
raise NotImplementedError()

@abc.abstractmethod
def send_device_messages(self, destination: str, immediate: bool = True) -> None:
async def send_device_messages(
self, destinations: StrCollection, immediate: bool = True
) -> None:
"""Tells the sender that a new device message is ready to be sent to the
destination. The `immediate` flag specifies whether the messages should
destinations. The `immediate` flag specifies whether the messages should
be tried to be sent immediately, or whether it can be delayed for a
short while (to aid performance).
"""
Expand Down Expand Up @@ -716,6 +722,13 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
pdu.internal_metadata.stream_ordering,
)

destinations = await filter_destinations_by_retry_limiter(
destinations,
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)

Expand Down Expand Up @@ -763,12 +776,20 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
domains = [
domains: StrCollection = [
d
for d in domains_set
if not self.is_mine_server_name(d)
and self._federation_shard_config.should_handle(self._instance_name, d)
]

domains = await filter_destinations_by_retry_limiter(
domains,
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

if not domains:
return

Expand Down Expand Up @@ -816,7 +837,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
for queue in queues:
queue.flush_read_receipts_for_room(room_id)

def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
Expand All @@ -831,13 +852,20 @@ def send_presence_to_destinations(
for state in states:
assert self.is_mine_id(state.user_id)

destinations = await filter_destinations_by_retry_limiter(
[
d
for d in destinations
if self._federation_shard_config.should_handle(self._instance_name, d)
],
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

for destination in destinations:
if self.is_mine_server_name(destination):
continue
if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
continue

self._get_per_destination_queue(destination).send_presence(
states, start_loop=False
Expand Down Expand Up @@ -896,21 +924,29 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
else:
queue.send_edu(edu)

def send_device_messages(self, destination: str, immediate: bool = True) -> None:
if self.is_mine_server_name(destination):
logger.warning("Not sending device update to ourselves")
return

if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
return
async def send_device_messages(
self, destinations: StrCollection, immediate: bool = True
) -> None:
destinations = await filter_destinations_by_retry_limiter(
[
destination
for destination in destinations
if self._federation_shard_config.should_handle(
self._instance_name, destination
)
and not self.is_mine_server_name(destination)
],
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

if immediate:
self._get_per_destination_queue(destination).attempt_new_transaction()
else:
self._get_per_destination_queue(destination).mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)
for destination in destinations:
if immediate:
self._get_per_destination_queue(destination).attempt_new_transaction()
else:
self._get_per_destination_queue(destination).mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)

def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
Expand Down
6 changes: 5 additions & 1 deletion synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
)


# If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000


class PerDestinationQueue:
"""
Manages the per-destination transmission queues.
Expand Down Expand Up @@ -370,7 +374,7 @@ async def _transaction_transmission_loop(self) -> None:
),
)

if e.retry_interval > 60 * 60 * 1000:
if e.retry_interval > CATCHUP_RETRY_INTERVAL:
# we won't retry for another hour!
# (this suggests a significant outage)
# We drop pending EDUs because otherwise they will
Expand Down
26 changes: 13 additions & 13 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,17 +836,16 @@ async def _handle_new_device_update_async(self) -> None:
user_id,
hosts,
)
for host in hosts:
self.federation_sender.send_device_messages(
host, immediate=False
)
# TODO: when called, this isn't in a logging context.
# This leads to log spam, sentry event spam, and massive
# memory usage.
# See https://github.com/matrix-org/synapse/issues/12552.
# log_kv(
# {"message": "sent device update to host", "host": host}
# )
await self.federation_sender.send_device_messages(
hosts, immediate=False
)
# TODO: when called, this isn't in a logging context.
# This leads to log spam, sentry event spam, and massive
# memory usage.
# See https://github.com/matrix-org/synapse/issues/12552.
# log_kv(
# {"message": "sent device update to host", "host": host}
# )
Comment on lines -839 to +848
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 -> 6 levels of indentation :)


if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
Expand Down Expand Up @@ -951,8 +950,9 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:

# Notify things that device lists need to be sent out.
self.notifier.notify_replication()
for host in potentially_changed_hosts:
self.federation_sender.send_device_messages(host, immediate=False)
await self.federation_sender.send_device_messages(
potentially_changed_hosts, immediate=False
)


def _update_device_from_client_ips(
Expand Down
7 changes: 3 additions & 4 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,9 @@ async def send_device_message(
)

if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
await self.federation_sender.send_device_messages(remote_messages.keys())

async def get_events_for_dehydrated_device(
self,
Expand Down
16 changes: 9 additions & 7 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@ async def maybe_send_presence_to_interested_destinations(
)

for destination, host_states in hosts_to_states.items():
self._federation.send_presence_to_destinations(host_states, [destination])
await self._federation.send_presence_to_destinations(
host_states, [destination]
)

async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
"""
Expand Down Expand Up @@ -936,7 +938,7 @@ async def _update_states(
)

for destination, states in hosts_to_states.items():
self._federation_queue.send_presence_to_destinations(
await self._federation_queue.send_presence_to_destinations(
states, [destination]
)

Expand Down Expand Up @@ -1508,7 +1510,7 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non
or state.status_msg is not None
]

self._federation_queue.send_presence_to_destinations(
await self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)
Expand All @@ -1519,7 +1521,7 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
self._federation_queue.send_presence_to_destinations(
await self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)
Expand Down Expand Up @@ -2182,7 +2184,7 @@ def _clear_queue(self) -> None:
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]

def send_presence_to_destinations(
async def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: StrCollection
) -> None:
"""Send the presence states to the given destinations.
Expand All @@ -2202,7 +2204,7 @@ def send_presence_to_destinations(
return

if self._federation:
self._federation.send_presence_to_destinations(
await self._federation.send_presence_to_destinations(
states=states,
destinations=destinations,
)
Expand Down Expand Up @@ -2325,7 +2327,7 @@ async def process_replication_rows(

for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
self._federation.send_presence_to_destinations(
await self._federation.send_presence_to_destinations(
states=states.values(),
destinations=[host],
)
14 changes: 11 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
from synapse.types import JsonDict, Requester, StreamKeyType, UserID
from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
from synapse.util.wheel_timer import WheelTimer

if TYPE_CHECKING:
Expand Down Expand Up @@ -150,8 +151,15 @@ async def _push_remote(self, member: RoomMember, typing: bool) -> None:
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
)

hosts = await self._storage_controllers.state.get_current_hosts_in_room(
member.room_id
hosts: StrCollection = (
await self._storage_controllers.state.get_current_hosts_in_room(
member.room_id
)
)
hosts = await filter_destinations_by_retry_limiter(
hosts,
clock=self.clock,
store=self.store,
)
for domain in hosts:
if not self.is_mine_server_name(domain):
Expand Down
2 changes: 1 addition & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:

# Send to remote destinations.
destination = UserID.from_string(user).domain
presence_handler.get_federation_queue().send_presence_to_destinations(
await presence_handler.get_federation_queue().send_presence_to_destinations(
presence_events, [destination]
)

Expand Down
Loading