Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Federation Sender & Appservice Pusher Stream Optimisations (#13251)
Browse files Browse the repository at this point in the history
* Replace `get_new_events_for_appservice` with `get_all_new_events_stream`

The functions were near identical and this brings the AS worker closer
to the way federation senders work which can allow for multiple workers
to handle AS traffic.

* Pull received TS alongside events when processing the stream

This avoids an extra query -per event- when both federation sender
and appservice pusher process events.
  • Loading branch information
Fizzadar authored Jul 15, 2022
1 parent fe15a86 commit 21eeacc
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 87 deletions.
1 change: 1 addition & 0 deletions changelog.d/13251.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise federation sender and appservice pusher event stream processing queries. Contributed by Nick @ Beeper (@fizzadar).
10 changes: 7 additions & 3 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ async def _process_event_queue_loop(self) -> None:
self._is_processing = True
while True:
last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream(
(
next_token,
events,
event_to_received_ts,
) = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
)

Expand Down Expand Up @@ -476,7 +480,7 @@ async def handle_event(event: EventBase) -> None:
await self._send_pdu(event, sharded_destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
ts = event_to_received_ts[event.event_id]
assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
Expand Down Expand Up @@ -509,7 +513,7 @@ async def handle_room_events(events: List[EventBase]) -> None:

if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
ts = event_to_received_ts[events[-1].event_id]
assert ts is not None

synapse.metrics.event_processing_lag.labels(
Expand Down
11 changes: 6 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
limit = 100
upper_bound = -1
while upper_bound < self.current_max:
last_token = await self.store.get_appservice_last_pos()
(
upper_bound,
events,
) = await self.store.get_new_events_for_appservice(
self.current_max, limit
event_to_received_ts,
) = await self.store.get_all_new_events_stream(
last_token, self.current_max, limit=100, get_prev_content=True
)

events_by_room: Dict[str, List[EventBase]] = {}
Expand Down Expand Up @@ -150,7 +151,7 @@ async def start_scheduler() -> None:
)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
ts = event_to_received_ts[event.event_id]
assert ts is not None

synapse.metrics.event_processing_lag_by_event.labels(
Expand Down Expand Up @@ -187,7 +188,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:

if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
ts = event_to_received_ts[events[-1].event_id]
assert ts is not None

synapse.metrics.event_processing_lag.labels(
Expand Down
58 changes: 18 additions & 40 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,52 +371,30 @@ def _get_oldest_unsent_txn(
device_list_summary=DeviceListUpdates(),
)

async def set_appservice_last_pos(self, pos: int) -> None:
def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None:
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
)
async def get_appservice_last_pos(self) -> int:
"""
Get the last stream ordering position for the appservice process.
"""

await self.db_pool.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
return await self.db_pool.simple_select_one_onecol(
table="appservice_stream_position",
retcol="stream_ordering",
keyvalues={},
desc="get_appservice_last_pos",
)

async def get_new_events_for_appservice(
self, current_id: int, limit: int
) -> Tuple[int, List[EventBase]]:
"""Get all new events for an appservice"""

def get_new_events_for_appservice_txn(
txn: LoggingTransaction,
) -> Tuple[int, List[str]]:
sql = (
"SELECT e.stream_ordering, e.event_id"
" FROM events AS e"
" WHERE"
" (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering"
" AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)

txn.execute(sql, (current_id, limit))
rows = txn.fetchall()

upper_bound = current_id
if len(rows) == limit:
upper_bound = rows[-1][0]

return upper_bound, [row[1] for row in rows]
async def set_appservice_last_pos(self, pos: int) -> None:
"""
Set the last stream ordering position for the appservice process.
"""

upper_bound, event_ids = await self.db_pool.runInteraction(
"get_new_events_for_appservice", get_new_events_for_appservice_txn
await self.db_pool.simple_update_one(
table="appservice_stream_position",
keyvalues={},
updatevalues={"stream_ordering": pos},
desc="set_appservice_last_pos",
)

events = await self.get_events_as_list(event_ids, get_prev_content=True)

return upper_bound, events

async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
Expand Down
19 changes: 0 additions & 19 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,25 +292,6 @@ def process_replication_rows(

super().process_replication_rows(stream_name, instance_name, token, rows)

async def get_received_ts(self, event_id: str) -> Optional[int]:
"""Get received_ts (when it was persisted) for the event.
Raises an exception for unknown events.
Args:
event_id: The event ID to query.
Returns:
Timestamp in milliseconds, or None for events that were persisted
before received_ts was implemented.
"""
return await self.db_pool.simple_select_one_onecol(
table="events",
keyvalues={"event_id": event_id},
retcol="received_ts",
desc="get_received_ts",
)

async def have_censored_event(self, event_id: str) -> bool:
"""Check if an event has been censored, i.e. if the content of the event has been erased
from the database due to a redaction.
Expand Down
32 changes: 20 additions & 12 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,8 +1022,8 @@ def _get_events_around_txn(
}

async def get_all_new_events_stream(
self, from_id: int, current_id: int, limit: int
) -> Tuple[int, List[EventBase]]:
self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
"""Get all new events
Returns all events with from_id < stream_ordering <= current_id.
Expand All @@ -1032,19 +1032,21 @@ async def get_all_new_events_stream(
from_id: the stream_ordering of the last event we processed
current_id: the stream_ordering of the most recently processed event
limit: the maximum number of events to return
get_prev_content: whether to fetch previous event content
Returns:
A tuple of (next_id, events), where `next_id` is the next value to
pass as `from_id` (it will either be the stream_ordering of the
last returned event, or, if fewer than `limit` events were found,
the `current_id`).
A tuple of (next_id, events, event_to_received_ts), where `next_id`
is the next value to pass as `from_id` (it will either be the
stream_ordering of the last returned event, or, if fewer than `limit`
events were found, the `current_id`). The `event_to_received_ts` is
a dictionary mapping event ID to the event `received_ts`.
"""

def get_all_new_events_stream_txn(
txn: LoggingTransaction,
) -> Tuple[int, List[str]]:
) -> Tuple[int, Dict[str, Optional[int]]]:
sql = (
"SELECT e.stream_ordering, e.event_id"
"SELECT e.stream_ordering, e.event_id, e.received_ts"
" FROM events AS e"
" WHERE"
" ? < e.stream_ordering AND e.stream_ordering <= ?"
Expand All @@ -1059,15 +1061,21 @@ def get_all_new_events_stream_txn(
if len(rows) == limit:
upper_bound = rows[-1][0]

return upper_bound, [row[1] for row in rows]
event_to_received_ts: Dict[str, Optional[int]] = {
row[1]: row[2] for row in rows
}
return upper_bound, event_to_received_ts

upper_bound, event_ids = await self.db_pool.runInteraction(
upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
"get_all_new_events_stream", get_all_new_events_stream_txn
)

events = await self.get_events_as_list(event_ids)
events = await self.get_events_as_list(
event_to_received_ts.keys(),
get_prev_content=get_prev_content,
)

return upper_bound, events
return upper_bound, events, event_to_received_ts

async def get_federation_out_pos(self, typ: str) -> int:
if self._need_to_reset_federation_stream_positions:
Expand Down
16 changes: 8 additions & 8 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def setUp(self):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastores.return_value = Mock(main=self.mock_store)
self.mock_store.get_received_ts.return_value = make_awaitable(0)
self.mock_store.get_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable(
None
Expand All @@ -76,9 +76,9 @@ def test_notify_interested_services(self):
event = Mock(
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [])),
make_awaitable((1, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [], {})),
make_awaitable((1, [event], {event.event_id: 0})),
]
self.handler.notify_interested_services(RoomStreamToken(None, 1))

Expand All @@ -95,8 +95,8 @@ def test_query_user_exists_unknown_user(self):

event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event], {event.event_id: 0})),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand All @@ -112,8 +112,8 @@ def test_query_user_exists_known_user(self):

event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event], {event.event_id: 0})),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand Down

0 comments on commit 21eeacc

Please sign in to comment.