From dd2112e9e25ceb99989a5aaa8e55d3d4ea93305d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 4 Apr 2022 19:01:58 -0500 Subject: [PATCH] Generate historic pagination token for /messages when no from token provided Part of https://github.com/matrix-org/synapse/issues/12281 Context: https://github.com/matrix-org/synapse/pull/12319#discussion_r840276412 --- scripts-dev/complement.sh | 2 +- synapse/handlers/pagination.py | 51 +++++++++++++++++++++++- synapse/handlers/room.py | 2 +- synapse/python_dependencies.py | 1 + synapse/storage/databases/main/stream.py | 47 ++++++++++++++++++++-- synapse/streams/events.py | 6 +-- 6 files changed, 98 insertions(+), 11 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index d1b59ff0401b..919016abc085 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -71,4 +71,4 @@ fi # Run the tests! echo "Images built; running complement" -go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... +go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/ diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 876b879483e7..218da77a59e1 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -27,7 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester +from synapse.types import JsonDict, Requester, RoomStreamToken from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -441,7 +441,54 @@ async def get_messages( if pagin_config.from_token: from_token = pagin_config.from_token else: - from_token = self.hs.get_event_sources().get_current_token_for_pagination() + from_token = ( + await self.hs.get_event_sources().get_current_token_for_pagination( + room_id + ) + ) + assert from_token.room_key.topological + # from_live_token = ( + # self.hs.get_event_sources().get_current_token_for_pagination() + # ) + # # Convert the live token (sXXX) into a historic token (tXXX-XXX) + # # which is more suitable for /messages. + # current_stream_ordering = from_live_token.room_key.stream + # current_topographical_ordering = ( + # await self.store.get_current_topological_token( + # room_id, current_stream_ordering + # ) + # ) + # from_token = from_live_token.copy_and_replace( + # "room_key", + # RoomStreamToken( + # current_topographical_ordering, current_stream_ordering + # ), + # ) + # logger.info( + # "get_messages(room_id=%s)\n\tfrom_token=%s\n\tcurrent_stream_ordering=%s\n\tcurrent_topographical_ordering=%s\n\tfrom_live_token=%s", + # room_id, + # await from_token.to_string(self.store), + # current_stream_ordering, + # current_topographical_ordering, + # await from_live_token.to_string(self.store), + # ) + logger.info( + "get_messages(room_id=%s)\n\tfrom_token=%s", + room_id, + await from_token.to_string(self.store), + ) + logger.info( + "asdf_get_debug_events_in_room_ordered_by_depth %s", + await self.store.asdf_get_debug_events_in_room_ordered_by_depth( + room_id + ), + ) + logger.info( + "asdf_get_debug_events_in_room_ordered_by_stream_ordering %s", + await self.store.asdf_get_debug_events_in_room_ordered_by_stream_ordering( + room_id + ), + ) if pagin_config.limit is None: # This shouldn't happen as we've set a default limit before this diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 51a08fd2c08d..793cb30c7976 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1444,7 +1444,7 @@ async def get_new_events( def get_current_key(self) -> RoomStreamToken: return self.store.get_room_max_token() - def get_current_key_for_room(self, room_id: str) -> Awaitable[str]: + def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]: return self.store.get_room_events_max_id(room_id) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 8419ab3aca95..0623a2a58ad6 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -89,6 +89,7 @@ "matrix-common~=1.1.0", # We need packaging.requirements.Requirement, added in 16.1. "packaging>=16.1", + "tabulate>=0.8.9", ] CONDITIONAL_REQUIREMENTS = { diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8e764790dbc2..0227a9e01035 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -42,6 +42,7 @@ from frozendict import frozendict from twisted.internet import defer +from tabulate import tabulate from synapse.api.filtering import Filter from synapse.events import EventBase @@ -748,21 +749,21 @@ def _f(txn): "get_room_event_before_stream_ordering", _f ) - async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str: + async def get_room_events_max_id(self, room_id: Optional[str] = None) -> RoomStreamToken: """Returns the current token for rooms stream. By default, it returns the current global stream token. Specifying a `room_id` causes it to return the current room specific topological token. """ - token = self.get_room_max_stream_ordering() + stream_ordering = self.get_room_max_stream_ordering() if room_id is None: - return "s%d" % (token,) + return RoomStreamToken(None, stream_ordering) else: topo = await self.db_pool.runInteraction( "_get_max_topological_txn", self._get_max_topological_txn, room_id ) - return "t%d-%d" % (topo, token) + return RoomStreamToken(topo, stream_ordering) def get_stream_id_for_event_txn( self, @@ -808,6 +809,44 @@ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToke ) return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) + async def asdf_get_debug_events_in_room_ordered_by_depth(self, room_id: str) -> Any: + """Gets the topological token in a room after or at the given stream + ordering. + + Args: + room_id + """ + sql = ( + "SELECT depth, stream_ordering, type, state_key, event_id FROM events" + " WHERE events.room_id = ?" + " ORDER BY depth DESC, stream_ordering DESC;" + ) + rows = await self.db_pool.execute( + "asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id + ) + + headers = ["depth", "stream_ordering", "type", "state_key", "event_id"] + return tabulate(rows, headers=headers) + + async def asdf_get_debug_events_in_room_ordered_by_stream_ordering(self, room_id: str) -> Any: + """Gets the topological token in a room after or at the given stream + ordering. + + Args: + room_id + """ + sql = ( + "SELECT depth, stream_ordering, type, state_key, event_id FROM events" + " WHERE events.room_id = ?" + " ORDER BY stream_ordering DESC, depth DESC;" + ) + rows = await self.db_pool.execute( + "asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id + ) + + headers = ["depth", "stream_ordering", "type", "state_key", "event_id"] + return tabulate(rows, headers=headers) + async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream ordering. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index fb8fe1729516..09341f935c15 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterator, Tuple +from typing import TYPE_CHECKING, Iterator, Optional, Tuple import attr @@ -69,7 +69,7 @@ def get_current_token(self) -> StreamToken: ) return token - def get_current_token_for_pagination(self) -> StreamToken: + async def get_current_token_for_pagination(self, room_id: str) -> StreamToken: """Get the current token for a given room to be used to paginate events. @@ -80,7 +80,7 @@ def get_current_token_for_pagination(self) -> StreamToken: The current token for pagination. """ token = StreamToken( - room_key=self.sources.room.get_current_key(), + room_key=await self.sources.room.get_current_key_for_room(room_id), presence_key=0, typing_key=0, receipt_key=0,