Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add endpoint to get an event at a given timestamp - MSC3030 #9445

Merged
merged 55 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
91b1b36
Add endpoint to get an event at a given timestamp
erikjohnston Feb 19, 2021
8e5fa11
Update to latest REST servlet standards
MadLittleMods Jul 15, 2021
96c48ba
Merge branch 'develop' into eric/jump-to-date
MadLittleMods Jul 26, 2021
668aa4e
Remove thread_id
MadLittleMods Jul 27, 2021
5b07487
Use updated method name check_joined_room -> check_user_in_room
MadLittleMods Jul 27, 2021
f721899
Use correct requester to sender access method
MadLittleMods Jul 27, 2021
af085ab
Use up-to-date db interaction method
MadLittleMods Jul 27, 2021
065273b
Add Complement MSC feature flag
MadLittleMods Jul 28, 2021
0e0ddda
Merge branch 'develop' into eric/jump-to-date
MadLittleMods Nov 10, 2021
e321ef7
Fix query sort order so it returns the closest event before/after
MadLittleMods Nov 12, 2021
e21e4b5
Add ?dir parameter
MadLittleMods Nov 12, 2021
fa15989
Add federated /timestamp_to_event endpoint and logic to ask
MadLittleMods Nov 17, 2021
ec2695d
Determine forward extremity by edges
MadLittleMods Nov 17, 2021
22a93c3
Add experimental feature flag for MSC3030
MadLittleMods Nov 17, 2021
b311853
Document and move to unstable prefixes
MadLittleMods Nov 17, 2021
5638123
Add changelog
MadLittleMods Nov 17, 2021
612b51f
Fix lints
MadLittleMods Nov 17, 2021
654d7ae
Fix lint
MadLittleMods Nov 17, 2021
6280d36
Fix lint
MadLittleMods Nov 17, 2021
8766b0a
Filter events before handing them over federation
MadLittleMods Nov 25, 2021
86a2642
Don't spam logs for 404's
MadLittleMods Nov 25, 2021
5a2c997
Allow returning event_ids for hidden events according to history visi…
MadLittleMods Nov 25, 2021
bc3ba38
Ignore rejected events and look for gaps instead
MadLittleMods Nov 25, 2021
edac953
We only return event or raise an exception
MadLittleMods Nov 25, 2021
ab800e3
Only return ValueError from storage layer
MadLittleMods Nov 25, 2021
9800a4b
Add docstring
MadLittleMods Nov 25, 2021
8523bf3
Better description for event_next_to_gap
MadLittleMods Nov 25, 2021
f05c292
Fix lints
MadLittleMods Nov 25, 2021
984a14b
Merge branch 'develop' into eric/jump-to-date
MadLittleMods Nov 25, 2021
dae7e0a
Remove filter_events_for_server because always ok to see event_id
MadLittleMods Nov 29, 2021
87ac1ed
Rename to is_event_next_to_gap
MadLittleMods Nov 29, 2021
2a5b622
Remove extra space typo
MadLittleMods Nov 29, 2021
0610fac
Include inclusive comment on all comment docs
MadLittleMods Nov 29, 2021
183e1bf
Remove redundant continues
MadLittleMods Nov 29, 2021
5362bd3
Fix query rejections and edge cases
MadLittleMods Nov 30, 2021
76ac526
Update function name to portray what it accepts, event_id
MadLittleMods Nov 30, 2021
58d67f2
Fix backward gap detection
MadLittleMods Nov 30, 2021
63d61fc
Type the transport client for /timestamp_to_event requests
MadLittleMods Nov 30, 2021
70420e5
Optimize when we ask other federated homeservers
MadLittleMods Nov 30, 2021
a8644b9
Only return remote event when closer
MadLittleMods Nov 30, 2021
c38984c
Add origin_server_ts to client endpoint
MadLittleMods Nov 30, 2021
ed1360a
Make asserts more obvious
MadLittleMods Nov 30, 2021
13371a6
Better comments
MadLittleMods Nov 30, 2021
d137292
Also check for the event itself as a backward extremity
MadLittleMods Nov 30, 2021
662366a
Remove redundant NPE check
MadLittleMods Nov 30, 2021
dd7e689
Make more comment clear on why not and what we can use it to rely on
MadLittleMods Nov 30, 2021
e7d2120
Fix typo
MadLittleMods Nov 30, 2021
5660fde
Use federation_client and better validation
MadLittleMods Nov 30, 2021
c3c404b
Louder warnings about real problems
MadLittleMods Dec 1, 2021
1ff2db4
Optimize query to a single IN clause
MadLittleMods Dec 1, 2021
067d67a
Merge branch 'develop' into eric/jump-to-date
MadLittleMods Dec 1, 2021
5888eba
Fix lint
MadLittleMods Dec 1, 2021
68704f4
Fix mypy lints
MadLittleMods Dec 1, 2021
3ee5d0c
Use %r to protect from ascii junk
MadLittleMods Dec 2, 2021
2621e5d
Remove MSC3030 complement test flag because unknown state of tests at…
MadLittleMods Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi

# Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 "${EXTRA_COMPLEMENT_ARGS[@]}" ./tests/...
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716,msc3030 -count=1 "${EXTRA_COMPLEMENT_ARGS[@]}" ./tests/...
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ def read_config(self, config: JsonDict, **kwargs):

# MSC3266 (room summary api)
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)

# MSC3030 (Jump to date API endpoint)
self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False)
16 changes: 16 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,22 @@ async def on_backfill_request(

return 200, res

async def on_timestamp_to_event_request(
self, origin: str, room_id: str, timestamp: int, direction: str
) -> Tuple[int, Dict[str, Any]]:
with (await self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

# We only try to fetch data from the local database
event_id = await self.store.get_event_for_timestamp(
room_id, timestamp, direction
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

return 200, {
"event_id": event_id,
}

async def on_incoming_transaction(
self,
origin: str,
Expand Down
15 changes: 15 additions & 0 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,21 @@ async def backfill(
destination, path=path, args=args, try_trailing_slash_on_400=True
)

@log_function
async def timestamp_to_event(
self, destination: str, room_id: str, timestamp: int, direction: str
) -> Optional[JsonDict]:
"""
TODO
"""
path = _create_v1_path("/timestamp_to_event/%s", room_id)

args = {"ts": [str(timestamp)], "dir": [direction]}

return await self.client.get_json(
destination, path=path, args=args, try_trailing_slash_on_400=True
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

@log_function
async def send_transaction(
self,
Expand Down
12 changes: 11 additions & 1 deletion synapse/federation/transport/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
Authenticator,
BaseFederationServlet,
)
from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES
from synapse.federation.transport.server.federation import (
FEDERATION_SERVLET_CLASSES,
FederationTimestampLookupServlet,
)
from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES
from synapse.federation.transport.server.groups_server import (
GROUP_SERVER_SERVLET_CLASSES,
Expand Down Expand Up @@ -324,6 +327,13 @@ def register_servlets(
)

for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]:
# Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
if (
servletclass == FederationTimestampLookupServlet
and not hs.config.experimental.msc3030_enabled
):
continue
squahtx marked this conversation as resolved.
Show resolved Hide resolved

servletclass(
hs=hs,
authenticator=authenticator,
Expand Down
21 changes: 21 additions & 0 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,26 @@ async def on_GET(
return await self.handler.on_backfill_request(origin, room_id, versions, limit)


class FederationTimestampLookupServlet(BaseFederationServerServlet):
PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"

async def on_GET(
self,
origin: str,
content: Literal[None],
query: Dict[bytes, List[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
timestamp = parse_integer_from_args(query, "ts", None)
direction = parse_string_from_args(
query, "dir", default="f", allowed_values=["f", "b"]
)

return await self.handler.on_timestamp_to_event_request(
origin, room_id, timestamp, direction
)


class FederationQueryServlet(BaseFederationServerServlet):
PATH = "/query/(?P<query_type>[^/]*)"

Expand Down Expand Up @@ -680,6 +700,7 @@ async def on_GET(
FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationTimestampLookupServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
Expand Down
61 changes: 31 additions & 30 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@
logger = logging.getLogger(__name__)


def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
"""Get joined domains from state

Args:
state: State map from type/state key to event.

Returns:
Returns a list of servers with the lowest depth of their joins.
Sorted by lowest depth first.
"""
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]

joined_domains: Dict[str, int] = {}
for u, d in joined_users:
try:
dom = get_domain_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
except Exception:
pass

return sorted(joined_domains.items(), key=lambda d: d[1])


class FederationHandler:
"""Handles general incoming federation requests

Expand Down Expand Up @@ -268,36 +299,6 @@ async def _maybe_backfill_inner(

curr_state = await self.state_handler.get_current_state(room_id)

def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
"""Get joined domains from state

Args:
state: State map from type/state key to event.

Returns:
Returns a list of servers with the lowest depth of their joins.
Sorted by lowest depth first.
"""
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]

joined_domains: Dict[str, int] = {}
for u, d in joined_users:
try:
dom = get_domain_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
except Exception:
pass

return sorted(joined_domains.items(), key=lambda d: d[1])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just de-nesting this function so we can re-use it

curr_domains = get_domains_from_state(curr_state)

likely_domains = [
Expand Down
75 changes: 75 additions & 0 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_and_validate_server_name
from synapse.visibility import filter_events_for_client
from synapse.handlers.federation import get_domains_from_state

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -1217,6 +1218,80 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]:
return results


class TimestampLookupHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.state_handler = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()

async def get_event_for_timestamp(
self,
requester: Requester,
room_id: str,
timestamp: int,
direction: str,
) -> Optional[JsonDict]:
event_id = await self.store.get_event_for_timestamp(
room_id, timestamp, direction
)

if not event_id:
raise SynapseError(
404,
"Unable to find event from %s in direction %s" % (timestamp, direction),
errcode=Codes.NOT_FOUND,
)

# If we found an extremity, we should probably ask another homeserver
# first about more history in between
is_extremity = await self.store.check_if_event_is_extremity(room_id, event_id)
logger.info(
"get_event_for_timestamp: locally, we found event=%s closest to timestamp=%s and is is_extremity=%s",
event_id,
timestamp,
is_extremity,
)
if is_extremity:
logger.info(
"get_event_for_timestamp: locally, we found event=%s closest to timestamp=%s is an extremity so we're asking other homeservers first",
event_id,
timestamp,
)

curr_state = await self.state_handler.get_current_state(room_id)
curr_domains = get_domains_from_state(curr_state)
likely_domains = [
domain for domain, depth in curr_domains if domain != self.server_name
]

for domain in likely_domains:
try:
remote_response = await self.transport_layer.timestamp_to_event(
domain, room_id, timestamp, direction
)
squahtx marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
"get_event_for_timestamp: response from domain(%s)=%s",
domain,
remote_response,
)
remote_event_id = remote_response.get("event_id", None)
if remote_event_id:
# TODO: Do we want to persist this as an extremity?
return remote_event_id

continue
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
logger.exception(
"Failed to fetch /timestamp_to_event from %s because %s",
domain,
e,
)
continue
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

return event_id


class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
Expand Down
29 changes: 29 additions & 0 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,33 @@ def register_txn_path(
)


class TimestampLookupRestServlet(RestServlet):
PATTERNS = client_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")

def __init__(self, hs: "HomeServer"):
super().__init__()
self._auth = hs.get_auth()
self._store = hs.get_datastore()
self.timestamp_lookup_handler = hs.get_timestamp_lookup_handler()

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await self._auth.check_user_in_room(room_id, requester.user.to_string())

timestamp = parse_integer(request, "ts")
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])

event_id = await self.timestamp_lookup_handler.get_event_for_timestamp(
requester, room_id, timestamp, direction
)

return 200, {
"event_id": event_id,
}
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem I'm seeing with going with a /timestamp_to_event endpoint is that there is no way to actually paginate with /messages after getting the event_id.

We probably want to consider also returning a pagination stream token to use with the from/to on /messages

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just not worry about it, we can make a separate request to get one:

https://github.com/matrix-org/complement/blob/581cb0b26e602763c4ec52e3c5f5047a2a4d95b7/tests/msc2716_test.go#L1323-L1326

// Get a pagination token for a single event
contextRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "context", someEventId}, client.WithContentType("application/json"), client.WithQueries(url.Values{
	"limit": []string{"0"},
}))



class RoomSpaceSummaryRestServlet(RestServlet):
PATTERNS = (
re.compile(
Expand Down Expand Up @@ -1239,6 +1266,8 @@ def register_servlets(
RoomAliasListServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
RoomCreateRestServlet(hs).register(http_server)
if hs.config.experimental.msc3030_enabled:
TimestampLookupRestServlet(hs).register(http_server)

# Some servlets only get registered for the main process.
if not is_worker:
Expand Down
5 changes: 5 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
RoomContextHandler,
RoomCreationHandler,
RoomShutdownHandler,
TimestampLookupHandler,
)
from synapse.handlers.room_batch import RoomBatchHandler
from synapse.handlers.room_list import RoomListHandler
Expand Down Expand Up @@ -725,6 +726,10 @@ def get_pagination_handler(self) -> PaginationHandler:
def get_room_context_handler(self) -> RoomContextHandler:
return RoomContextHandler(self)

@cache_in_self
def get_timestamp_lookup_handler(self) -> TimestampLookupHandler:
return TimestampLookupHandler(self)

@cache_in_self
def get_registration_handler(self) -> RegistrationHandler:
return RegistrationHandler(self)
Expand Down
Loading