Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix AssertionErrors being thrown by EventsStream
Browse files Browse the repository at this point in the history
Part of the problem was that there was an off-by-one error in the assertion,
but also the limit logic was too simple. Fix it all up and add some tests.
  • Loading branch information
richvdh committed Apr 28, 2020
1 parent b21490b commit 3778424
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 11 deletions.
28 changes: 21 additions & 7 deletions synapse/replication/tcp/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,30 @@ async def _update_function(
from_token, upper_limit, target_row_count
) # type: List[Tuple]

# again, if we've hit the limit there, we'll need to limit the other sources
assert len(state_rows) < target_row_count
assert len(state_rows) <= target_row_count

# there can be more than one row per stream_id in that table, so if we hit
# the limit there, we'll need to truncate the results so that we have a complete
# set of changes for all the stream IDs we include.
if len(state_rows) == target_row_count:
assert state_rows[-1][0] <= upper_limit
upper_limit = state_rows[-1][0]
limited = True
upper_limit = state_rows[-1][0] - 1

# search for the point to truncate the list
for idx in range(len(state_rows) - 1, 0, -1):
if state_rows[idx - 1][0] <= upper_limit:
state_rows = state_rows[:idx]
break
else:
# bother. We didn't get a full set of changes for even a single
# stream id. let's run the query again, without a row limit, but for
# just one stream id.
upper_limit += 1
state_rows = await self._store.get_all_updated_current_state_deltas(
from_token, upper_limit, limit=None
)

# FIXME: is it a given that there is only one row per stream_id in the
# state_deltas table (so that we can be sure that we have got all of the
# rows for upper_limit)?
limited = True

# finally, fetch the ex-outliers rows. We assume there are few enough of these
# not to bother with the limit.
Expand Down
14 changes: 11 additions & 3 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,15 +1084,23 @@ def get_all_new_backfill_event_rows(txn):
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)

def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
def get_all_updated_current_state_deltas(
self, from_token: int, to_token: int, limit: Optional[int]
):
def get_all_updated_current_state_deltas_txn(txn):
sql = """
SELECT stream_id, room_id, type, state_key, event_id
FROM current_state_delta_stream
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC LIMIT ?
ORDER BY stream_id ASC
"""
txn.execute(sql, (from_token, to_token, limit))
params = [from_token, to_token]

if limit is not None:
sql += "LIMIT ?"
params.append(limit)

txn.execute(sql, params)
return txn.fetchall()

return self.db.runInteraction(
Expand Down
Loading

0 comments on commit 3778424

Please sign in to comment.