Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Clean up code for sending federation EDUs. (#5381)
Browse files Browse the repository at this point in the history
This code confused the hell out of me today. Split _get_new_device_messages
into its two (unrelated) parts.
  • Loading branch information
richvdh committed Jun 13, 2019
1 parent 6312d6c commit 5c15039
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
1 change: 1 addition & 0 deletions changelog.d/5381.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up code for sending federation EDUs.
40 changes: 26 additions & 14 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,21 @@ def _transaction_transmission_loop(self):

pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
# We have to keep 2 free slots for presence and rr_edus
yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2

device_update_edus, dev_list_id = (
yield self._get_device_update_edus(limit)
)

limit -= len(device_update_edus)

to_device_edus, device_stream_id = (
yield self._get_to_device_message_edus(limit)
)

pending_edus = device_update_edus + to_device_edus

# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
Expand All @@ -208,10 +218,6 @@ def _transaction_transmission_loop(self):
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]

pending_edus = []

# We can only include at most 100 EDUs per transactions
# rr_edus and pending_presence take at most one slot each
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
Expand All @@ -232,7 +238,6 @@ def _transaction_transmission_loop(self):
)
)

pending_edus.extend(device_message_edus)
pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
Expand Down Expand Up @@ -272,10 +277,13 @@ def _transaction_transmission_loop(self):
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)

# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
Expand Down Expand Up @@ -347,7 +355,7 @@ def _pop_pending_edus(self, limit):
return pending_edus

@defer.inlineCallbacks
def _get_new_device_messages(self, limit):
def _get_device_update_edus(self, limit):
last_device_list = self._last_device_list_stream_id

# Retrieve list of new device updates to send to the destination
Expand All @@ -366,22 +374,26 @@ def _get_new_device_messages(self, limit):

assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"

defer.returnValue((edus, now_stream_id))

@defer.inlineCallbacks
def _get_to_device_message_edus(self, limit):
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination,
last_device_stream_id,
to_device_stream_id,
limit - len(edus),
limit,
)
edus.extend(
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.direct_to_device",
content=content,
)
for content in contents
)
]

defer.returnValue((edus, stream_id, now_stream_id))
defer.returnValue((edus, stream_id))

0 comments on commit 5c15039

Please sign in to comment.