From e63cb7d4e210a50495ee9cff9965579e7fb562ff Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 20 Oct 2020 17:08:54 +0100 Subject: [PATCH 1/4] Limit AS transactions to 100 events --- changelog.d/8606.feature | 1 + synapse/appservice/scheduler.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) create mode 100644 changelog.d/8606.feature diff --git a/changelog.d/8606.feature b/changelog.d/8606.feature new file mode 100644 index 000000000000..6e257c41ce76 --- /dev/null +++ b/changelog.d/8606.feature @@ -0,0 +1 @@ +Appservice transactions are limited to 100 persistent and 100 ephemeral events. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index ad3c408519ee..3aff89d38cdf 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -59,6 +59,8 @@ logger = logging.getLogger(__name__) +MAX_EVENTS_PER_TRANSACTION = 100 + class ApplicationServiceScheduler: """ Public facing API for this module. Does the required DI to tie the @@ -136,10 +138,17 @@ async def _send_request(self, service: ApplicationService): self.requests_in_flight.add(service.id) try: while True: - events = self.queued_events.pop(service.id, []) - ephemeral = self.queued_ephemeral.pop(service.id, []) + all_events = self.queued_events.get(service.id, []) + events = all_events[:MAX_EVENTS_PER_TRANSACTION] + del all_events[MAX_EVENTS_PER_TRANSACTION:] + + all_events_ephemeral = self.queued_ephemeral.get(service.id, []) + ephemeral = all_events_ephemeral[:MAX_EVENTS_PER_TRANSACTION] + del all_events_ephemeral[MAX_EVENTS_PER_TRANSACTION:] + if not events and not ephemeral: return + try: await self.txn_ctrl.send(service, events, ephemeral) except Exception: From 042677cde93dd1001548e2a3cf0896e223a7c471 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 20 Oct 2020 17:33:57 +0100 Subject: [PATCH 2/4] Update changelog.d/8606.feature Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/8606.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/8606.feature b/changelog.d/8606.feature index 6e257c41ce76..fad723c10859 100644 --- a/changelog.d/8606.feature +++ b/changelog.d/8606.feature @@ -1 +1 @@ -Appservice transactions are limited to 100 persistent and 100 ephemeral events. +Limit appservice transactions to 100 persistent and 100 ephemeral events. From c423acfc4cd7fd9a457048d201dcba63ffb5e907 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 20 Oct 2020 17:55:16 +0100 Subject: [PATCH 3/4] Add tests --- synapse/appservice/scheduler.py | 15 +++++++---- tests/appservice/test_scheduler.py | 41 ++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3aff89d38cdf..acd2b3558dce 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -59,7 +59,12 @@ logger = logging.getLogger(__name__) -MAX_EVENTS_PER_TRANSACTION = 100 + +# Maximum number of events to provide in a AS transaction. +MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 + +# Maximum number of ephemeral events to provide in a AS transaction. +MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 class ApplicationServiceScheduler: @@ -139,12 +144,12 @@ async def _send_request(self, service: ApplicationService): try: while True: all_events = self.queued_events.get(service.id, []) - events = all_events[:MAX_EVENTS_PER_TRANSACTION] - del all_events[MAX_EVENTS_PER_TRANSACTION:] + events = all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION] + del all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION] all_events_ephemeral = self.queued_ephemeral.get(service.id, []) - ephemeral = all_events_ephemeral[:MAX_EVENTS_PER_TRANSACTION] - del all_events_ephemeral[MAX_EVENTS_PER_TRANSACTION:] + ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] + del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] if not events and not ephemeral: return diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 2acb8b7603b0..97f8cad0ddd4 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -260,6 +260,31 @@ def do_send(x, y, z): self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], []) self.assertEquals(3, self.txn_ctrl.send.call_count) + def test_send_large_txns(self): + srv_1_defer = defer.Deferred() + srv_2_defer = defer.Deferred() + send_return_list = [srv_1_defer, srv_2_defer] + + def do_send(x, y, z): + return make_deferred_yieldable(send_return_list.pop(0)) + + self.txn_ctrl.send = Mock(side_effect=do_send) + + service = Mock(id=4, name="service") + event_list = [Mock(name="event%i" % (i + 1)) for i in range(200)] + for event in event_list: + self.queuer.enqueue_event(service, event) + + # Expect the first event to be sent immediately. + self.txn_ctrl.send.assert_called_with(service, [event_list[0]], []) + srv_1_defer.callback(service) + # Then send the next 100 events + self.txn_ctrl.send.assert_called_with(service, event_list[1:101], []) + srv_2_defer.callback(service) + # Then the final 99 events + self.txn_ctrl.send.assert_called_with(service, event_list[101:], []) + self.assertEquals(3, self.txn_ctrl.send.call_count) + def test_send_single_ephemeral_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4, name="service") @@ -296,3 +321,19 @@ def test_send_single_ephemeral_with_queue(self): # Expect the queued events to be sent self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3) self.assertEquals(2, self.txn_ctrl.send.call_count) + + def test_send_large_txns_ephemeral(self): + d = defer.Deferred() + self.txn_ctrl.send = Mock( + side_effect=lambda x, y, z: make_deferred_yieldable(d) + ) + # Expect the event to be sent immediately. + service = Mock(id=4, name="service") + first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)] + second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)] + event_list = first_chunk + second_chunk + self.queuer.enqueue_ephemeral(service, event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk) + d.callback(service) + self.txn_ctrl.send.assert_called_with(service, [], second_chunk) + self.assertEquals(2, self.txn_ctrl.send.call_count) From 1d6f8e6aa57fa3c8a76b221b9588ad4a8392aab3 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 21 Oct 2020 15:03:35 +0100 Subject: [PATCH 4/4] Update synapse/appservice/scheduler.py --- synapse/appservice/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index acd2b3558dce..58291afc2231 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -60,10 +60,10 @@ logger = logging.getLogger(__name__) -# Maximum number of events to provide in a AS transaction. +# Maximum number of events to provide in an AS transaction. MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 -# Maximum number of ephemeral events to provide in a AS transaction. +# Maximum number of ephemeral events to provide in an AS transaction. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100