diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3aff89d38cdf..acd2b3558dce 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -59,7 +59,12 @@ logger = logging.getLogger(__name__) -MAX_EVENTS_PER_TRANSACTION = 100 + +# Maximum number of events to provide in a AS transaction. +MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 + +# Maximum number of ephemeral events to provide in a AS transaction. +MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 class ApplicationServiceScheduler: @@ -139,12 +144,12 @@ async def _send_request(self, service: ApplicationService): try: while True: all_events = self.queued_events.get(service.id, []) - events = all_events[:MAX_EVENTS_PER_TRANSACTION] - del all_events[MAX_EVENTS_PER_TRANSACTION:] + events = all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION] + del all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION] all_events_ephemeral = self.queued_ephemeral.get(service.id, []) - ephemeral = all_events_ephemeral[:MAX_EVENTS_PER_TRANSACTION] - del all_events_ephemeral[MAX_EVENTS_PER_TRANSACTION:] + ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] + del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] if not events and not ephemeral: return diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 2acb8b7603b0..97f8cad0ddd4 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -260,6 +260,31 @@ def do_send(x, y, z): self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], []) self.assertEquals(3, self.txn_ctrl.send.call_count) + def test_send_large_txns(self): + srv_1_defer = defer.Deferred() + srv_2_defer = defer.Deferred() + send_return_list = [srv_1_defer, srv_2_defer] + + def do_send(x, y, z): + return make_deferred_yieldable(send_return_list.pop(0)) + + self.txn_ctrl.send = Mock(side_effect=do_send) + + service = Mock(id=4, name="service") + event_list = [Mock(name="event%i" % (i + 1)) for i in range(200)] + for event in event_list: + self.queuer.enqueue_event(service, event) + + # Expect the first event to be sent immediately. + self.txn_ctrl.send.assert_called_with(service, [event_list[0]], []) + srv_1_defer.callback(service) + # Then send the next 100 events + self.txn_ctrl.send.assert_called_with(service, event_list[1:101], []) + srv_2_defer.callback(service) + # Then the final 99 events + self.txn_ctrl.send.assert_called_with(service, event_list[101:], []) + self.assertEquals(3, self.txn_ctrl.send.call_count) + def test_send_single_ephemeral_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4, name="service") @@ -296,3 +321,19 @@ def test_send_single_ephemeral_with_queue(self): # Expect the queued events to be sent self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3) self.assertEquals(2, self.txn_ctrl.send.call_count) + + def test_send_large_txns_ephemeral(self): + d = defer.Deferred() + self.txn_ctrl.send = Mock( + side_effect=lambda x, y, z: make_deferred_yieldable(d) + ) + # Expect the event to be sent immediately. + service = Mock(id=4, name="service") + first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)] + second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)] + event_list = first_chunk + second_chunk + self.queuer.enqueue_ephemeral(service, event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk) + d.callback(service) + self.txn_ctrl.send.assert_called_with(service, [], second_chunk) + self.assertEquals(2, self.txn_ctrl.send.call_count)