Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't wake up destination transaction queue if they're not due for retry. #16223

Merged
merged 12 commits into from
Sep 4, 2023
6 changes: 4 additions & 2 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.metrics import Measure

from .units import Edu
Expand Down Expand Up @@ -245,7 +245,9 @@ async def send_presence_to_destinations(

self.notifier.on_new_replication_data()

def send_device_messages(self, destination: str, immediate: bool = True) -> None:
async def send_device_messages(
self, destinations: StrCollection, immediate: bool = True
) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
Expand Down
42 changes: 26 additions & 16 deletions synapse/federation/sender/__init__.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to add the filtering to

  • _send_pdu
  • send_presence_to_destinations
  • send_read_receipt

Are there other EDUs we should worry about here? (Typing? Device list stuff? to-device messages?)

What about the other methods on FederationSender?

  • send_edu
  • send_device_messages

is the point that only the former three handle multiple destinations in one call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Oh there is logic below for typing EDUs. But why don't they go via the federation sender too?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, mainly because a) send_edu and send_device_messages just take a single host, and b) I forgot about device messages 🤦

Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,11 @@ def build_and_send_edu(
raise NotImplementedError()

@abc.abstractmethod
def send_device_messages(self, destination: str, immediate: bool = True) -> None:
async def send_device_messages(
self, destinations: StrCollection, immediate: bool = True
) -> None:
"""Tells the sender that a new device message is ready to be sent to the
destination. The `immediate` flag specifies whether the messages should
destinations. The `immediate` flag specifies whether the messages should
be tried to be sent immediately, or whether it can be delayed for a
short while (to aid performance).
"""
Expand Down Expand Up @@ -922,21 +924,29 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
else:
queue.send_edu(edu)

def send_device_messages(self, destination: str, immediate: bool = True) -> None:
if self.is_mine_server_name(destination):
logger.warning("Not sending device update to ourselves")
return

if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
return
async def send_device_messages(
self, destinations: StrCollection, immediate: bool = True
) -> None:
destinations = await filter_destinations_by_retry_limiter(
[
destination
for destination in destinations
if self._federation_shard_config.should_handle(
self._instance_name, destination
)
and not self.is_mine_server_name(destination)
],
clock=self.clock,
store=self.store,
retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
)

if immediate:
self._get_per_destination_queue(destination).attempt_new_transaction()
else:
self._get_per_destination_queue(destination).mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)
for destination in destinations:
if immediate:
self._get_per_destination_queue(destination).attempt_new_transaction()
else:
self._get_per_destination_queue(destination).mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)

def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
Expand Down
26 changes: 13 additions & 13 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,17 +836,16 @@ async def _handle_new_device_update_async(self) -> None:
user_id,
hosts,
)
for host in hosts:
self.federation_sender.send_device_messages(
host, immediate=False
)
# TODO: when called, this isn't in a logging context.
# This leads to log spam, sentry event spam, and massive
# memory usage.
# See https://github.com/matrix-org/synapse/issues/12552.
# log_kv(
# {"message": "sent device update to host", "host": host}
# )
await self.federation_sender.send_device_messages(
hosts, immediate=False
)
# TODO: when called, this isn't in a logging context.
# This leads to log spam, sentry event spam, and massive
# memory usage.
# See https://github.com/matrix-org/synapse/issues/12552.
# log_kv(
# {"message": "sent device update to host", "host": host}
# )
Comment on lines -839 to +848
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 -> 6 levels of indentation :)


if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
Expand Down Expand Up @@ -951,8 +950,9 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:

# Notify things that device lists need to be sent out.
self.notifier.notify_replication()
for host in potentially_changed_hosts:
self.federation_sender.send_device_messages(host, immediate=False)
await self.federation_sender.send_device_messages(
potentially_changed_hosts, immediate=False
)


def _update_device_from_client_ips(
Expand Down
7 changes: 3 additions & 4 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,9 @@ async def send_device_message(
)

if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
await self.federation_sender.send_device_messages(remote_messages.keys())

async def get_events_for_dehydrated_device(
self,
Expand Down
6 changes: 2 additions & 4 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,14 @@ async def process_replication_rows(
for row in rows
if not row.entity.startswith("@") and not row.is_signature
}
for host in hosts:
self.federation_sender.send_device_messages(host, immediate=False)
await self.federation_sender.send_device_messages(hosts, immediate=False)

elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
# clients and remote servers, so we ignore entities that start with
# '@' (since they'll be local users rather than destinations).
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
for host in hosts:
self.federation_sender.send_device_messages(host)
await self.federation_sender.send_device_messages(hosts)

async def _on_new_receipts(
self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow]
Expand Down
16 changes: 12 additions & 4 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ def test_send_device_updates(self) -> None:
self.reactor.advance(1)

# a second call should produce no new device EDUs
self.hs.get_federation_sender().send_device_messages("host2")
self.get_success(
self.hs.get_federation_sender().send_device_messages(["host2"])
)
Comment on lines -345 to +348
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, more of these. Maybe we should resurrect #14519 , at least for the tests

self.assertEqual(self.edus, [])

# a second device
Expand Down Expand Up @@ -551,7 +553,9 @@ def test_unreachable_server(self) -> None:

# recover the server
mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2")
self.get_success(
self.hs.get_federation_sender().send_device_messages(["host2"])
)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
Expand Down Expand Up @@ -602,7 +606,9 @@ def test_prune_outbound_device_pokes1(self) -> None:

# recover the server
mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2")
self.get_success(
self.hs.get_federation_sender().send_device_messages(["host2"])
)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
Expand Down Expand Up @@ -657,7 +663,9 @@ def test_prune_outbound_device_pokes2(self) -> None:

# recover the server
mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2")
self.get_success(
self.hs.get_federation_sender().send_device_messages(["host2"])
)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
Expand Down