Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't go into federation catch up mode so easily #9561

Merged
merged 9 commits into from
Mar 15, 2021
1 change: 1 addition & 0 deletions changelog.d/9561.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increase the threshold before which outbound federation to a server goes into "catch up" mode, which is expensive for the remote server to handle.
112 changes: 54 additions & 58 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def __init__(
self._destination = destination
self.transmission_loop_running = False

# Flag to signal to any running transmission loop that there is new data
# queued up to be sent.
self._new_data_to_send = False

# True whilst we are sending events that the remote homeserver missed
# because it was unreachable. We start in this state so we can perform
# catch-up at startup.
Expand Down Expand Up @@ -208,6 +212,10 @@ def attempt_new_transaction(self) -> None:
transaction in the background.
"""

# Mark that we (may) have new things to send, so that any running
# transmission loop will recheck whether there is stuff to send.
self._new_data_to_send = True

if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
Expand Down Expand Up @@ -250,6 +258,8 @@ async def _transaction_transmission_loop(self) -> None:

pending_pdus = []
while True:
self._new_data_to_send = False

# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2

Expand All @@ -266,15 +276,6 @@ async def _transaction_transmission_loop(self) -> None:

pending_edus = device_update_edus + to_device_edus

# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.

pending_pdus = self._pending_pdus

# We can only include at most 50 PDUs per transactions
Expand Down Expand Up @@ -320,55 +321,58 @@ async def _transaction_transmission_loop(self) -> None:
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return

# If we've gotten told about new things to send during
# checking for things to send, we try looking again.
# Otherwise new PDUs or EDUs might arrive in the meantime,
# but not get sent because we hold the
# `transmission_loop_running` flag.
if self._new_data_to_send:
continue
else:
return

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))

# END CRITICAL SECTION

success = await self._transaction_manager.send_new_transaction(
await self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
await self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)

# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
await self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
await self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id

if pending_pdus:
# we sent some PDUs and it was successful, so update our
# last_successful_stream_ordering in the destinations table.
final_pdu = pending_pdus[-1]
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
else:
break
# also mark the device updates as sent
if device_update_edus:
logger.info("Marking as sent %r %r", self._destination, dev_list_id)
await self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id

if pending_pdus:
# we sent some PDUs and it was successful, so update our
# last_successful_stream_ordering in the destinations table.
final_pdu = pending_pdus[-1]
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
Expand Down Expand Up @@ -401,7 +405,7 @@ async def _transaction_transmission_loop(self) -> None:
self._pending_presence = {}
self._pending_rrs = {}

self._start_catching_up()
self._start_catching_up()
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
Expand All @@ -412,7 +416,6 @@ async def _transaction_transmission_loop(self) -> None:
e,
)

self._start_catching_up()
except RequestSendFailed as e:
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
Expand All @@ -422,16 +425,12 @@ async def _transaction_transmission_loop(self) -> None:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)

self._start_catching_up()
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
Comment on lines 351 to 354
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but I wonder if we should try to log the event IDs on one line here instead of 50. In another PR though.


self._start_catching_up()
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
Expand Down Expand Up @@ -499,13 +498,10 @@ async def _catch_up_transmission_loop(self) -> None:
rooms = [p.room_id for p in catchup_pdus]
logger.info("Catching up rooms to %s: %r", self._destination, rooms)

success = await self._transaction_manager.send_new_transaction(
await self._transaction_manager.send_new_transaction(
self._destination, catchup_pdus, []
)

if not success:
return

sent_transactions_counter.inc()
final_pdu = catchup_pdus[-1]
self._last_successful_stream_ordering = cast(
Expand Down
48 changes: 14 additions & 34 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,12 @@ async def send_new_transaction(
destination: str,
pdus: List[EventBase],
edus: List[Edu],
) -> bool:
) -> None:
"""
Args:
destination: The destination to send to (e.g. 'example.org')
pdus: In-order list of PDUs to send
edus: List of EDUs to send

Returns:
True iff the transaction was successful
Comment on lines -79 to -80
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth mentioning what this raises while we're here? 😄

"""

# Make a transaction-sending opentracing span. This span follows on from
Expand All @@ -96,8 +93,6 @@ async def send_new_transaction(
edu.strip_context()

with start_active_span_follows_from("send_transaction", span_contexts):
success = True

logger.debug("TX [%s] _attempt_new_transaction", destination)

txn_id = str(self._next_txn_id)
Expand Down Expand Up @@ -152,44 +147,29 @@ def json_data_cb():
response = await self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response

if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise e

logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)

if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warning(
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
e_id,
r,
)
else:
for p in pdus:
set_tag(tags.ERROR, True)

logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise

logger.info("TX [%s] {%s} got 200 response", destination, txn_id)

for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warning(
"TX [%s] {%s} Failed to send event %s",
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
p.event_id,
e_id,
r,
)
success = False

if success and pdus and destination in self._federation_metrics_domains:
if pdus and destination in self._federation_metrics_domains:
last_pdu = pdus[-1]
last_pdu_ts_metric.labels(server_name=destination).set(
last_pdu.origin_server_ts / 1000
)

set_tag(tags.ERROR, not success)
return success
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,11 @@ def _store_destination_rooms_entries_txn(

self.db_pool.simple_upsert_many_txn(
txn,
"destination_rooms",
["destination", "room_id"],
rows,
["stream_ordering"],
[(stream_ordering,)] * len(rows),
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
)

async def get_destination_last_successful_stream_ordering(
Expand Down