Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Limit AS transactions to 100 events #8606

Merged
merged 4 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8606.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Limit appservice transactions to 100 persistent and 100 ephemeral events.
18 changes: 16 additions & 2 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@
logger = logging.getLogger(__name__)


# Maximum number of events to provide in an AS transaction.
MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100

# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100


class ApplicationServiceScheduler:
""" Public facing API for this module. Does the required DI to tie the
components together. This also serves as the "event_pool", which in this
Expand Down Expand Up @@ -136,10 +143,17 @@ async def _send_request(self, service: ApplicationService):
self.requests_in_flight.add(service.id)
try:
while True:
events = self.queued_events.pop(service.id, [])
ephemeral = self.queued_ephemeral.pop(service.id, [])
all_events = self.queued_events.get(service.id, [])
events = all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]
del all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]

all_events_ephemeral = self.queued_ephemeral.get(service.id, [])
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]

if not events and not ephemeral:
return

try:
await self.txn_ctrl.send(service, events, ephemeral)
except Exception:
Expand Down
41 changes: 41 additions & 0 deletions tests/appservice/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,31 @@ def do_send(x, y, z):
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], [])
self.assertEquals(3, self.txn_ctrl.send.call_count)

def test_send_large_txns(self):
srv_1_defer = defer.Deferred()
srv_2_defer = defer.Deferred()
send_return_list = [srv_1_defer, srv_2_defer]

def do_send(x, y, z):
return make_deferred_yieldable(send_return_list.pop(0))

self.txn_ctrl.send = Mock(side_effect=do_send)

service = Mock(id=4, name="service")
event_list = [Mock(name="event%i" % (i + 1)) for i in range(200)]
for event in event_list:
self.queuer.enqueue_event(service, event)

# Expect the first event to be sent immediately.
self.txn_ctrl.send.assert_called_with(service, [event_list[0]], [])
srv_1_defer.callback(service)
# Then send the next 100 events
self.txn_ctrl.send.assert_called_with(service, event_list[1:101], [])
srv_2_defer.callback(service)
# Then the final 99 events
self.txn_ctrl.send.assert_called_with(service, event_list[101:], [])
self.assertEquals(3, self.txn_ctrl.send.call_count)

def test_send_single_ephemeral_no_queue(self):
# Expect the event to be sent immediately.
service = Mock(id=4, name="service")
Expand Down Expand Up @@ -296,3 +321,19 @@ def test_send_single_ephemeral_with_queue(self):
# Expect the queued events to be sent
self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3)
self.assertEquals(2, self.txn_ctrl.send.call_count)

def test_send_large_txns_ephemeral(self):
d = defer.Deferred()
self.txn_ctrl.send = Mock(
side_effect=lambda x, y, z: make_deferred_yieldable(d)
)
# Expect the event to be sent immediately.
service = Mock(id=4, name="service")
first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)]
second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)]
event_list = first_chunk + second_chunk
self.queuer.enqueue_ephemeral(service, event_list)
self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk)
d.callback(service)
self.txn_ctrl.send.assert_called_with(service, [], second_chunk)
self.assertEquals(2, self.txn_ctrl.send.call_count)