Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed request_key from the SyncConfig (moved outside as its own function parameter) #17201

Merged
merged 8 commits into from
May 16, 2024
1 change: 1 addition & 0 deletions changelog.d/17200.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prepare sync handler to be able to return different sync responses (`SyncVersion`).
1 change: 1 addition & 0 deletions changelog.d/17201.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Organize the sync cache key parameter outside of the sync config (separate concerns).
69 changes: 60 additions & 9 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -112,12 +113,28 @@
SyncRequestKey = Tuple[Any, ...]


class SyncVersion(Enum):
"""
Enum for specifying the version of sync request. This is used to key which type of
sync response that we are generating.

This is different than the `sync_type` you might see used in other code below; which
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
"""

# These string values are semantically significant because they are used in the the
# metrics

# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
filter_collection: FilterCollection
is_guest: bool
request_key: SyncRequestKey
device_id: Optional[str]


Expand Down Expand Up @@ -309,13 +326,26 @@ async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.

Args:
requester: The user requesting the sync response.
sync_config: Config/info necessary to process the sync request.
sync_version: Determines what kind of sync response to generate.
request_key: The key to use for caching the response.
since_token: The point in the stream to sync from.
timeout: How long to wait for new data to arrive before giving up.
full_state: Whether to return the full state for each room.
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
Expand All @@ -324,9 +354,10 @@ async def wait_for_sync_for_user(
await self.auth_blocking.check_auth_blocking(requester=requester)

res = await self.response_cache.wrap(
sync_config.request_key,
request_key,
self._wait_for_sync_for_user,
sync_config,
sync_version,
since_token,
timeout,
full_state,
Expand All @@ -338,6 +369,7 @@ async def wait_for_sync_for_user(
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
Expand All @@ -363,9 +395,11 @@ async def _wait_for_sync_for_user(
else:
sync_type = "incremental_sync"

sync_label = f"{sync_version}:{sync_type}"

context = current_context()
if context:
context.tag = sync_type
context.tag = sync_label

# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
Expand All @@ -384,14 +418,16 @@ async def _wait_for_sync_for_user(
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
sync_config, sync_version, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)

result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
Expand All @@ -416,13 +452,14 @@ async def current_sync_callback(
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()

return result

async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
Expand All @@ -431,12 +468,26 @@ async def current_sync_for_user(
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.

Args:
sync_config: Config/info necessary to process the sync request.
sync_version: Determines what kind of sync response to generate.
since_token: The point in the stream to sync from.p.
full_state: Whether to return the full state for each room.
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: SyncResult = await self.generate_sync_result(
sync_config, since_token, full_state
)
else:
raise Exception(
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
KnockedSyncResult,
SyncConfig,
SyncResult,
SyncVersion,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
Expand Down Expand Up @@ -209,7 +210,6 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
user=user,
filter_collection=filter_collection,
is_guest=requester.is_guest,
request_key=request_key,
device_id=device_id,
)

Expand All @@ -232,6 +232,8 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
request_key,
since_token=since_token,
timeout=timeout,
full_state=full_state,
Expand Down
17 changes: 15 additions & 2 deletions tests/events/test_presence_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from synapse.types import JsonDict, StreamToken, create_requester
from synapse.util import Clock

from tests.handlers.test_sync import generate_sync_config
from tests.handlers.test_sync import SyncRequestKey, SyncVersion, generate_sync_config
from tests.unittest import (
FederatingHomeserverTestCase,
HomeserverTestCase,
Expand Down Expand Up @@ -498,6 +498,15 @@ def send_presence_update(
return channel.json_body


_request_key = 0


def generate_request_key() -> SyncRequestKey:
global _request_key
_request_key += 1
return ("request_key", _request_key)


def sync_presence(
testcase: HomeserverTestCase,
user_id: str,
Expand All @@ -521,7 +530,11 @@ def sync_presence(
sync_config = generate_sync_config(requester.user.to_string())
sync_result = testcase.get_success(
testcase.hs.get_sync_handler().wait_for_sync_for_user(
requester, sync_config, since_token
requester,
sync_config,
SyncVersion.SYNC_V2,
generate_request_key(),
since_token,
)
)

Expand Down
Loading
Loading