Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit da0e9f8

Browse files
authored
Faster joins: parse msc3706 fields in send_join response (#12011)
Part of my work on #11249: add code to handle the new fields added in MSC3706.
1 parent 6127c4b commit da0e9f8

File tree

6 files changed

+140
-33
lines changed

6 files changed

+140
-33
lines changed

changelog.d/12011.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Preparation for faster-room-join work: parse msc3706 fields in send_join response.

synapse/config/experimental.py

+4
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,7 @@ def read_config(self, config: JsonDict, **kwargs):
6464

6565
# MSC3706 (server-side support for partial state in /send_join responses)
6666
self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False)
67+
68+
# experimental support for faster joins over federation (msc2775, msc3706)
69+
# requires a target server with msc3706_enabled enabled.
70+
self.faster_joins_enabled: bool = experimental.get("faster_joins", False)

synapse/federation/federation_client.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2015-2021 The Matrix.org Foundation C.I.C.
1+
# Copyright 2015-2022 The Matrix.org Foundation C.I.C.
22
# Copyright 2020 Sorunome
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -89,6 +89,12 @@ class SendJoinResult:
8989
state: List[EventBase]
9090
auth_chain: List[EventBase]
9191

92+
# True if 'state' elides non-critical membership events
93+
partial_state: bool
94+
95+
# if 'partial_state' is set, a list of the servers in the room (otherwise empty)
96+
servers_in_room: List[str]
97+
9298

9399
class FederationClient(FederationBase):
94100
def __init__(self, hs: "HomeServer"):
@@ -876,11 +882,18 @@ async def _execute(pdu: EventBase) -> None:
876882
% (auth_chain_create_events,)
877883
)
878884

885+
if response.partial_state and not response.servers_in_room:
886+
raise InvalidResponseError(
887+
"partial_state was set, but no servers were listed in the room"
888+
)
889+
879890
return SendJoinResult(
880891
event=event,
881892
state=signed_state,
882893
auth_chain=signed_auth,
883894
origin=destination,
895+
partial_state=response.partial_state,
896+
servers_in_room=response.servers_in_room or [],
884897
)
885898

886899
# MSC3083 defines additional error codes for room joins.

synapse/federation/transport/client.py

+87-31
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
1+
# Copyright 2014-2022 The Matrix.org Foundation C.I.C.
22
# Copyright 2020 Sorunome
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -60,6 +60,7 @@ class TransportLayerClient:
6060
def __init__(self, hs):
6161
self.server_name = hs.hostname
6262
self.client = hs.get_federation_http_client()
63+
self._faster_joins_enabled = hs.config.experimental.faster_joins_enabled
6364

6465
async def get_room_state_ids(
6566
self, destination: str, room_id: str, event_id: str
@@ -336,10 +337,15 @@ async def send_join_v2(
336337
content: JsonDict,
337338
) -> "SendJoinResponse":
338339
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
340+
query_params: Dict[str, str] = {}
341+
if self._faster_joins_enabled:
342+
# lazy-load state on join
343+
query_params["org.matrix.msc3706.partial_state"] = "true"
339344

340345
return await self.client.put_json(
341346
destination=destination,
342347
path=path,
348+
args=query_params,
343349
data=content,
344350
parser=SendJoinParser(room_version, v1_api=False),
345351
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
@@ -1271,6 +1277,12 @@ class SendJoinResponse:
12711277
# "event" is not included in the response.
12721278
event: Optional[EventBase] = None
12731279

1280+
# The room state is incomplete
1281+
partial_state: bool = False
1282+
1283+
# List of servers in the room
1284+
servers_in_room: Optional[List[str]] = None
1285+
12741286

12751287
@ijson.coroutine
12761288
def _event_parser(event_dict: JsonDict) -> Generator[None, Tuple[str, Any], None]:
@@ -1297,6 +1309,32 @@ def _event_list_parser(
12971309
events.append(event)
12981310

12991311

1312+
@ijson.coroutine
1313+
def _partial_state_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
1314+
"""Helper function for use with `ijson.items_coro`
1315+
1316+
Parses the partial_state field in send_join responses
1317+
"""
1318+
while True:
1319+
val = yield
1320+
if not isinstance(val, bool):
1321+
raise TypeError("partial_state must be a boolean")
1322+
response.partial_state = val
1323+
1324+
1325+
@ijson.coroutine
1326+
def _servers_in_room_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
1327+
"""Helper function for use with `ijson.items_coro`
1328+
1329+
Parses the servers_in_room field in send_join responses
1330+
"""
1331+
while True:
1332+
val = yield
1333+
if not isinstance(val, list) or any(not isinstance(x, str) for x in val):
1334+
raise TypeError("servers_in_room must be a list of strings")
1335+
response.servers_in_room = val
1336+
1337+
13001338
class SendJoinParser(ByteParser[SendJoinResponse]):
13011339
"""A parser for the response to `/send_join` requests.
13021340
@@ -1308,44 +1346,62 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
13081346
CONTENT_TYPE = "application/json"
13091347

13101348
def __init__(self, room_version: RoomVersion, v1_api: bool):
1311-
self._response = SendJoinResponse([], [], {})
1349+
self._response = SendJoinResponse([], [], event_dict={})
13121350
self._room_version = room_version
1351+
self._coros = []
13131352

13141353
# The V1 API has the shape of `[200, {...}]`, which we handle by
13151354
# prefixing with `item.*`.
13161355
prefix = "item." if v1_api else ""
13171356

1318-
self._coro_state = ijson.items_coro(
1319-
_event_list_parser(room_version, self._response.state),
1320-
prefix + "state.item",
1321-
use_float=True,
1322-
)
1323-
self._coro_auth = ijson.items_coro(
1324-
_event_list_parser(room_version, self._response.auth_events),
1325-
prefix + "auth_chain.item",
1326-
use_float=True,
1327-
)
1328-
# TODO Remove the unstable prefix when servers have updated.
1329-
#
1330-
# By re-using the same event dictionary this will cause the parsing of
1331-
# org.matrix.msc3083.v2.event and event to stomp over each other.
1332-
# Generally this should be fine.
1333-
self._coro_unstable_event = ijson.kvitems_coro(
1334-
_event_parser(self._response.event_dict),
1335-
prefix + "org.matrix.msc3083.v2.event",
1336-
use_float=True,
1337-
)
1338-
self._coro_event = ijson.kvitems_coro(
1339-
_event_parser(self._response.event_dict),
1340-
prefix + "event",
1341-
use_float=True,
1342-
)
1357+
self._coros = [
1358+
ijson.items_coro(
1359+
_event_list_parser(room_version, self._response.state),
1360+
prefix + "state.item",
1361+
use_float=True,
1362+
),
1363+
ijson.items_coro(
1364+
_event_list_parser(room_version, self._response.auth_events),
1365+
prefix + "auth_chain.item",
1366+
use_float=True,
1367+
),
1368+
# TODO Remove the unstable prefix when servers have updated.
1369+
#
1370+
# By re-using the same event dictionary this will cause the parsing of
1371+
# org.matrix.msc3083.v2.event and event to stomp over each other.
1372+
# Generally this should be fine.
1373+
ijson.kvitems_coro(
1374+
_event_parser(self._response.event_dict),
1375+
prefix + "org.matrix.msc3083.v2.event",
1376+
use_float=True,
1377+
),
1378+
ijson.kvitems_coro(
1379+
_event_parser(self._response.event_dict),
1380+
prefix + "event",
1381+
use_float=True,
1382+
),
1383+
]
1384+
1385+
if not v1_api:
1386+
self._coros.append(
1387+
ijson.items_coro(
1388+
_partial_state_parser(self._response),
1389+
"org.matrix.msc3706.partial_state",
1390+
use_float="True",
1391+
)
1392+
)
1393+
1394+
self._coros.append(
1395+
ijson.items_coro(
1396+
_servers_in_room_parser(self._response),
1397+
"org.matrix.msc3706.servers_in_room",
1398+
use_float="True",
1399+
)
1400+
)
13431401

13441402
def write(self, data: bytes) -> int:
1345-
self._coro_state.send(data)
1346-
self._coro_auth.send(data)
1347-
self._coro_unstable_event.send(data)
1348-
self._coro_event.send(data)
1403+
for c in self._coros:
1404+
c.send(data)
13491405

13501406
return len(data)
13511407

synapse/python_dependencies.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@
8787
# We enforce that we have a `cryptography` version that bundles an `openssl`
8888
# with the latest security patches.
8989
"cryptography>=3.4.7",
90-
"ijson>=3.1",
90+
# ijson 3.1.4 fixes a bug with "." in property names
91+
"ijson>=3.1.4",
9192
"matrix-common~=1.1.0",
9293
]
9394

tests/federation/transport/test_client.py

+32
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,35 @@ def test_two_writes(self) -> None:
6262
self.assertEqual(len(parsed_response.state), 1, parsed_response)
6363
self.assertEqual(parsed_response.event_dict, {}, parsed_response)
6464
self.assertIsNone(parsed_response.event, parsed_response)
65+
self.assertFalse(parsed_response.partial_state, parsed_response)
66+
self.assertEqual(parsed_response.servers_in_room, None, parsed_response)
67+
68+
def test_partial_state(self) -> None:
69+
"""Check that the partial_state flag is correctly parsed"""
70+
parser = SendJoinParser(RoomVersions.V1, False)
71+
response = {
72+
"org.matrix.msc3706.partial_state": True,
73+
}
74+
75+
serialised_response = json.dumps(response).encode()
76+
77+
# Send data to the parser
78+
parser.write(serialised_response)
79+
80+
# Retrieve and check the parsed SendJoinResponse
81+
parsed_response = parser.finish()
82+
self.assertTrue(parsed_response.partial_state)
83+
84+
def test_servers_in_room(self) -> None:
85+
"""Check that the servers_in_room field is correctly parsed"""
86+
parser = SendJoinParser(RoomVersions.V1, False)
87+
response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
88+
89+
serialised_response = json.dumps(response).encode()
90+
91+
# Send data to the parser
92+
parser.write(serialised_response)
93+
94+
# Retrieve and check the parsed SendJoinResponse
95+
parsed_response = parser.finish()
96+
self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])

0 commit comments

Comments
 (0)