Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implement v2 APIs for send_join and send_leave #6349

Merged
merged 10 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6349.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)).
88 changes: 76 additions & 12 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,13 +664,7 @@ def check_authchain_validity(signed_auth_chain):

@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_join(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
content = yield self._do_send_join(destination, pdu)

logger.debug("Got content: %s", content)

Expand Down Expand Up @@ -737,6 +731,44 @@ def send_request(destination):

return self._try_destination_list("send_join", destinations, send_request)

@defer.inlineCallbacks
def _do_send_join(self, destination, pdu):
time_now = self._clock.time_msec()

try:
content = yield self.transport_layer.send_join_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()

# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()

logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")

resp = yield self.transport_layer.send_join_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]

@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
room_version = yield self.store.get_room_version(room_id)
Expand Down Expand Up @@ -846,18 +878,50 @@ def send_leave(self, destinations, pdu):

@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_leave(
content = yield self._do_send_leave(destination, pdu)

logger.debug("Got content: %s", content)
return None

return self._try_destination_list("send_leave", destinations, send_request)

@defer.inlineCallbacks
def _do_send_leave(self, destination, pdu):
time_now = self._clock.time_msec()

try:
content = yield self.transport_layer.send_leave_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

logger.debug("Got content: %s", content)
return None
return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()

return self._try_destination_list("send_leave", destinations, send_request)
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()

logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")

resp = yield self.transport_layer.send_leave_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]

def get_public_rooms(
self,
Expand Down
15 changes: 5 additions & 10 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,10 @@ async def on_send_join_request(self, origin, content, room_id):

res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
return (
200,
{
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
],
},
)
return {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]],
}

async def on_make_leave_request(self, origin, room_id, user_id):
origin_host, _ = parse_server_name(origin)
Expand All @@ -419,7 +414,7 @@ async def on_send_leave_request(self, origin, content, room_id):
pdu = await self._check_sigs_and_hash(room_version, pdu)

await self.handler.on_send_leave_request(origin, pdu)
return 200, {}
return {}

async def on_event_auth(self, origin, room_id, event_id):
with (await self._server_linearizer.queue((origin, room_id))):
Expand Down
33 changes: 31 additions & 2 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def make_membership_event(self, destination, room_id, user_id, membership, param

@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
def send_join_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)

response = yield self.client.put_json(
Expand All @@ -278,7 +278,18 @@ def send_join(self, destination, room_id, event_id, content):

@defer.inlineCallbacks
@log_function
def send_leave(self, destination, room_id, event_id, content):
def send_join_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)

response = yield self.client.put_json(
destination=destination, path=path, data=content
)

return response

@defer.inlineCallbacks
@log_function
def send_leave_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)

response = yield self.client.put_json(
Expand All @@ -294,6 +305,24 @@ def send_leave(self, destination, room_id, event_id, content):

return response

@defer.inlineCallbacks
@log_function
def send_leave_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)

response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
# we want to do our best to send this through. The problem is
# that if it fails, we won't retry it later, so if the remote
# server was just having a momentary blip, the room will be out of
# sync.
ignore_backoff=True,
)

return response

@defer.inlineCallbacks
@log_function
def send_invite_v1(self, destination, room_id, event_id, content):
Expand Down
32 changes: 28 additions & 4 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,19 @@ async def on_GET(self, origin, content, query, context, user_id):
return 200, content


class FederationSendLeaveServlet(BaseFederationServlet):
class FederationV1SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"

async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, (200, content)


class FederationV2SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"

PREFIX = FEDERATION_V2_PREFIX

async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, content
Expand All @@ -521,9 +531,21 @@ async def on_GET(self, origin, content, query, context, event_id):
return await self.handler.on_event_auth(origin, context, event_id)


class FederationSendJoinServlet(BaseFederationServlet):
class FederationV1SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"

async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = await self.handler.on_send_join_request(origin, content, context)
return 200, (200, content)


class FederationV2SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"

PREFIX = FEDERATION_V2_PREFIX

async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
Expand Down Expand Up @@ -1367,8 +1389,10 @@ async def on_GET(self, origin, content, query, room_id):
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
FederationEventServlet,
FederationSendJoinServlet,
FederationSendLeaveServlet,
FederationV1SendJoinServlet,
FederationV2SendJoinServlet,
FederationV1SendLeaveServlet,
FederationV2SendLeaveServlet,
FederationV1InviteServlet,
FederationV2InviteServlet,
FederationQueryAuthServlet,
Expand Down