Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Federation Sender & Appservice Pusher Stream Optimisations #13251

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13251.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise federation sender and appservice pusher event stream processing queries. Contributed by Nick @ Beeper (@fizzadar).
10 changes: 7 additions & 3 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ async def _process_event_queue_loop(self) -> None:
self._is_processing = True
while True:
last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream(
(
next_token,
events,
event_to_received_ts,
) = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
)

Expand Down Expand Up @@ -476,7 +480,7 @@ async def handle_event(event: EventBase) -> None:
await self._send_pdu(event, sharded_destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
ts = event_to_received_ts[event.event_id]
assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
Expand Down Expand Up @@ -509,7 +513,7 @@ async def handle_room_events(events: List[EventBase]) -> None:

if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
ts = event_to_received_ts[events[-1].event_id]
assert ts is not None

synapse.metrics.event_processing_lag.labels(
Expand Down
11 changes: 6 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
limit = 100
upper_bound = -1
while upper_bound < self.current_max:
last_token = await self.store.get_appservice_last_pos()
(
upper_bound,
events,
) = await self.store.get_new_events_for_appservice(
self.current_max, limit
event_to_received_ts,
) = await self.store.get_all_new_events_stream(
last_token, self.current_max, limit=100, get_prev_content=True
)

events_by_room: Dict[str, List[EventBase]] = {}
Expand Down Expand Up @@ -150,7 +151,7 @@ async def start_scheduler() -> None:
)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
ts = event_to_received_ts[event.event_id]
assert ts is not None

synapse.metrics.event_processing_lag_by_event.labels(
Expand Down Expand Up @@ -187,7 +188,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:

if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
ts = event_to_received_ts[events[-1].event_id]
assert ts is not None

synapse.metrics.event_processing_lag.labels(
Expand Down
58 changes: 18 additions & 40 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,52 +371,30 @@ def _get_oldest_unsent_txn(
device_list_summary=DeviceListUpdates(),
)

async def set_appservice_last_pos(self, pos: int) -> None:
def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None:
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
)
async def get_appservice_last_pos(self) -> int:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please could you give this a docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in c59d717

"""
Get the last stream ordering position for the appservice process.
"""

await self.db_pool.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
return await self.db_pool.simple_select_one_onecol(
table="appservice_stream_position",
retcol="stream_ordering",
keyvalues={},
desc="get_appservice_last_pos",
)

async def get_new_events_for_appservice(
self, current_id: int, limit: int
) -> Tuple[int, List[EventBase]]:
"""Get all new events for an appservice"""

def get_new_events_for_appservice_txn(
txn: LoggingTransaction,
) -> Tuple[int, List[str]]:
sql = (
"SELECT e.stream_ordering, e.event_id"
" FROM events AS e"
" WHERE"
" (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering"
" AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)

txn.execute(sql, (current_id, limit))
rows = txn.fetchall()

upper_bound = current_id
if len(rows) == limit:
upper_bound = rows[-1][0]

return upper_bound, [row[1] for row in rows]
async def set_appservice_last_pos(self, pos: int) -> None:
"""
Set the last stream ordering position for the appservice process.
"""

upper_bound, event_ids = await self.db_pool.runInteraction(
"get_new_events_for_appservice", get_new_events_for_appservice_txn
await self.db_pool.simple_update_one(
table="appservice_stream_position",
keyvalues={},
updatevalues={"stream_ordering": pos},
desc="set_appservice_last_pos",
)

events = await self.get_events_as_list(event_ids, get_prev_content=True)

return upper_bound, events

async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
Expand Down
19 changes: 0 additions & 19 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,25 +292,6 @@ def process_replication_rows(

super().process_replication_rows(stream_name, instance_name, token, rows)

async def get_received_ts(self, event_id: str) -> Optional[int]:
"""Get received_ts (when it was persisted) for the event.

Raises an exception for unknown events.

Args:
event_id: The event ID to query.

Returns:
Timestamp in milliseconds, or None for events that were persisted
before received_ts was implemented.
"""
return await self.db_pool.simple_select_one_onecol(
table="events",
keyvalues={"event_id": event_id},
retcol="received_ts",
desc="get_received_ts",
)

async def have_censored_event(self, event_id: str) -> bool:
"""Check if an event has been censored, i.e. if the content of the event has been erased
from the database due to a redaction.
Expand Down
32 changes: 20 additions & 12 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,8 +1022,8 @@ def _get_events_around_txn(
}

async def get_all_new_events_stream(
self, from_id: int, current_id: int, limit: int
) -> Tuple[int, List[EventBase]]:
self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the docstring!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Get all new events

Returns all events with from_id < stream_ordering <= current_id.
Expand All @@ -1032,19 +1032,21 @@ async def get_all_new_events_stream(
from_id: the stream_ordering of the last event we processed
current_id: the stream_ordering of the most recently processed event
limit: the maximum number of events to return
get_prev_content: whether to fetch previous event content

Returns:
A tuple of (next_id, events), where `next_id` is the next value to
pass as `from_id` (it will either be the stream_ordering of the
last returned event, or, if fewer than `limit` events were found,
the `current_id`).
A tuple of (next_id, events, event_to_received_ts), where `next_id`
is the next value to pass as `from_id` (it will either be the
stream_ordering of the last returned event, or, if fewer than `limit`
events were found, the `current_id`). The `event_to_received_ts` is
a dictionary mapping event ID to the event `received_ts`.
"""

def get_all_new_events_stream_txn(
txn: LoggingTransaction,
) -> Tuple[int, List[str]]:
) -> Tuple[int, Dict[str, Optional[int]]]:
sql = (
"SELECT e.stream_ordering, e.event_id"
"SELECT e.stream_ordering, e.event_id, e.received_ts"
" FROM events AS e"
" WHERE"
" ? < e.stream_ordering AND e.stream_ordering <= ?"
Expand All @@ -1059,15 +1061,21 @@ def get_all_new_events_stream_txn(
if len(rows) == limit:
upper_bound = rows[-1][0]

return upper_bound, [row[1] for row in rows]
event_to_received_ts: Dict[str, Optional[int]] = {
row[1]: row[2] for row in rows
}
return upper_bound, event_to_received_ts

upper_bound, event_ids = await self.db_pool.runInteraction(
upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
"get_all_new_events_stream", get_all_new_events_stream_txn
)

events = await self.get_events_as_list(event_ids)
events = await self.get_events_as_list(
event_to_received_ts.keys(),
get_prev_content=get_prev_content,
)

return upper_bound, events
return upper_bound, events, event_to_received_ts

async def get_federation_out_pos(self, typ: str) -> int:
if self._need_to_reset_federation_stream_positions:
Expand Down
16 changes: 8 additions & 8 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def setUp(self):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastores.return_value = Mock(main=self.mock_store)
self.mock_store.get_received_ts.return_value = make_awaitable(0)
self.mock_store.get_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable(
None
Expand All @@ -76,9 +76,9 @@ def test_notify_interested_services(self):
event = Mock(
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [])),
make_awaitable((1, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [], {})),
make_awaitable((1, [event], {event.event_id: 0})),
]
self.handler.notify_interested_services(RoomStreamToken(None, 1))

Expand All @@ -95,8 +95,8 @@ def test_query_user_exists_unknown_user(self):

event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event], {event.event_id: 0})),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand All @@ -112,8 +112,8 @@ def test_query_user_exists_known_user(self):

event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event], {event.event_id: 0})),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand Down