Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3195 from matrix-org/erikj/pagination_refactor
Browse files Browse the repository at this point in the history
 Refactor recent events func to use pagination func
  • Loading branch information
erikjohnston authored May 9, 2018
2 parents 6059021 + 7dd1341 commit 0461ef0
Showing 1 changed file with 27 additions and 51 deletions.
78 changes: 27 additions & 51 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore

from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -347,9 +346,9 @@ def f(txn):
defer.returnValue(ret)

@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
def get_recent_events_for_room(self, room_id, limit, end_token):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
room_id, limit, end_token,
)

logger.debug("stream before")
Expand All @@ -363,60 +362,37 @@ def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None)

defer.returnValue((events, token))

@cached(num_args=4)
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
end_token = RoomStreamToken.parse_stream_token(end_token)

if from_token is None:
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
else:
from_token = RoomStreamToken.parse_stream_token(from_token)
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering > ?"
" AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)

def get_recent_events_for_room_txn(txn):
if from_token is None:
txn.execute(sql, (room_id, end_token.stream, False, limit,))
else:
txn.execute(sql, (
room_id, from_token.stream, end_token.stream, False, limit
))
@defer.inlineCallbacks
def get_recent_event_ids_for_room(self, room_id, limit, end_token):
"""Get the most recent events in the room in topological ordering.
rows = self.cursor_to_dict(txn)
Args:
room_id (str)
limit (int)
end_token (str): The stream token representing now.
rows.reverse() # As we selected with reverse ordering
Returns:
Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of
dicts (which include event_ids, etc), and a tuple for
`(start_token, end_token)` representing the range of rows
returned.
The returned events are in ascending order.
"""
# Allow a zero limit here, and no-op.
if limit == 0:
defer.returnValue(([], (end_token, end_token)))

if rows:
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# since we are going backwards so we subtract one from the
# stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1
start_token = str(RoomStreamToken(topo, toke))
end_token = RoomStreamToken.parse_stream_token(end_token)

token = (start_token, str(end_token))
else:
token = (str(end_token), str(end_token))
rows, token = yield self.runInteraction(
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
room_id, from_token=end_token, limit=limit,
)

return rows, token
# We want to return the results in ascending order.
rows.reverse()

return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn
)
defer.returnValue((rows, (token, str(end_token))))

def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
Expand Down

0 comments on commit 0461ef0

Please sign in to comment.