Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor /context to reuse pagination storage functions #3193

Merged
merged 5 commits into from
May 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 99 additions & 110 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.engines import PostgresEngine

import abc
import logging
Expand Down Expand Up @@ -595,88 +595,28 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim
retcols=["stream_ordering", "topological_ordering"],
)

token = RoomStreamToken(
results["topological_ordering"],
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
results["topological_ordering"] - 1,
results["stream_ordering"],
)

if isinstance(self.database_engine, Sqlite3Engine):
# SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
# So we give pass it to SQLite3 as the UNION ALL of the two queries.

query_before = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering < ?"
" UNION ALL"
" SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
)
before_args = (
room_id, token.topological,
room_id, token.topological, token.stream,
before_limit,
)

query_after = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering > ?"
" UNION ALL"
" SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
)
after_args = (
room_id, token.topological,
room_id, token.topological, token.stream,
after_limit,
)
else:
query_before = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND %s"
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
) % (upper_bound(token, self.database_engine, inclusive=False),)

before_args = (room_id, before_limit)

query_after = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND %s"
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
) % (lower_bound(token, self.database_engine, inclusive=False),)

after_args = (room_id, after_limit)

txn.execute(query_before, before_args)
after_token = RoomStreamToken(
results["topological_ordering"],
results["stream_ordering"],
)

rows = self.cursor_to_dict(txn)
rows, start_token = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r["event_id"] for r in rows]

if rows:
start_token = str(RoomStreamToken(
rows[0]["topological_ordering"],
rows[0]["stream_ordering"] - 1,
))
else:
start_token = str(RoomStreamToken(
token.topological,
token.stream - 1,
))

txn.execute(query_after, after_args)

rows = self.cursor_to_dict(txn)
rows, end_token = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r["event_id"] for r in rows]

if rows:
end_token = str(RoomStreamToken(
rows[-1]["topological_ordering"],
rows[-1]["stream_ordering"],
))
else:
end_token = str(token)

return {
"before": {
"event_ids": events_before,
Expand Down Expand Up @@ -738,38 +678,49 @@ def update_federation_out_pos(self, typ, stream_id):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)

def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.

class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
Args:
txn
room_id (str)
from_token (RoomStreamToken): The token used to stream from
to_token (RoomStreamToken|None): A token which if given limits the
results to only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.

@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_ordering".
"""
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
if direction == 'b':
order = "DESC"
bounds = upper_bound(
RoomStreamToken.parse(from_key), self.database_engine
from_token, self.database_engine
)
if to_key:
if to_token:
bounds = "%s AND %s" % (bounds, lower_bound(
RoomStreamToken.parse(to_key), self.database_engine
to_token, self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
RoomStreamToken.parse(from_key), self.database_engine
from_token, self.database_engine
)
if to_key:
if to_token:
bounds = "%s AND %s" % (bounds, upper_bound(
RoomStreamToken.parse(to_key), self.database_engine
to_token, self.database_engine
))

filter_clause, filter_args = filter_to_clause(event_filter)
Expand All @@ -785,7 +736,8 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
limit_str = ""

sql = (
"SELECT * FROM events"
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
Expand All @@ -795,29 +747,58 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
"limit": limit_str
}

def f(txn):
txn.execute(sql, args)
txn.execute(sql, args)

rows = self.cursor_to_dict(txn)
rows = self.cursor_to_dict(txn)

if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = RoomStreamToken(topo, toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token

return rows, next_token,
return rows, str(next_token),

rows, token = yield self.runInteraction("paginate_room_events", f)
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.

Args:
room_id (str)
from_key (str): The token used to stream from
to_key (str|None): A token which if given limits the results to
only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.

Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "toplogical_ordering" and "stream_orderign".
"""

from_key = RoomStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)

rows, token = yield self.runInteraction(
"paginate_room_events", self._paginate_room_events_txn,
room_id, from_key, to_key, direction, limit, event_filter,
)

events = yield self._get_events(
[r["event_id"] for r in rows],
Expand All @@ -827,3 +808,11 @@ def f(txn):
self._set_before_and_after(events, rows)

defer.returnValue((events, token))


class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()

def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()