Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that incoming to-device messages are not dropped #17127

Merged
merged 5 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17127.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug which meant that to-device messages received over federation could be dropped when the server was under load or networking problems caused problems between Synapse processes or the database.
44 changes: 26 additions & 18 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,25 @@ async def _process_edu(edu_dict: JsonDict) -> None:
edu_type=edu_dict["edu_type"],
content=edu_dict["content"],
)
await self.registry.on_edu(edu.edu_type, origin, edu.content)
try:
await self.registry.on_edu(edu.edu_type, origin, edu.content)
except Exception:
# If there was an error handling the EDU, we must reject the
# transaction.
#
# Some EDU types (notably, to-device messages) are, despite their name,
# expected to be reliable; if we weren't able to do something with it,
# we have to tell the sender that, and the only way the protocol gives
# us to do so is by sending an HTTP error back on the transaction.
#
# We log the exception now, and then raise a new SynapseError to cause
# the transaction to be failed.
logger.exception("Error handling EDU of type %s", edu.edu_type)
raise SynapseError(500, f"Error handing EDU of type {edu.edu_type}")

# TODO: if the first EDU fails, we should probably abort the whole
# thing rather than carrying on with the rest of them. That would
# probably be best done inside `concurrently_execute`.

await concurrently_execute(
_process_edu,
Expand Down Expand Up @@ -1414,12 +1432,7 @@ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
handler = self.edu_handlers.get(edu_type)
if handler:
with start_active_span_from_edu(content, "handle_edu"):
try:
await handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
await handler(origin, content)
return

# Check if we can route it somewhere else that isn't us
Expand All @@ -1428,17 +1441,12 @@ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
# Pick an instance randomly so that we don't overload one.
route_to = random.choice(instances)

try:
await self._send_edu(
instance_name=route_to,
edu_type=edu_type,
origin=origin,
content=content,
)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
await self._send_edu(
instance_name=route_to,
edu_type=edu_type,
origin=origin,
content=content,
)
return

# Oh well, let's just log and move on.
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
"""
Handle receiving to-device messages from remote homeservers.

Note that any errors thrown from this method will cause the federation /send
request to receive an error response.

Args:
origin: The remote homeserver.
content: The JSON dictionary containing the to-device messages.
Expand Down
17 changes: 17 additions & 0 deletions tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ def test_bad_request(self, query_content: bytes) -> None:
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON")

def test_failed_edu_causes_500(self) -> None:
"""If the EDU handler fails, /send should return a 500."""

async def failing_handler(_origin: str, _content: JsonDict) -> None:
raise Exception("bleh")

self.hs.get_federation_registry().register_edu_handler(
"FAIL_EDU_TYPE", failing_handler
)

channel = self.make_signed_federation_request(
"PUT",
"/_matrix/federation/v1/send/txn",
{"edus": [{"edu_type": "FAIL_EDU_TYPE", "content": {}}]},
)
self.assertEqual(500, channel.code, channel.result)


class ServerACLsTestCase(unittest.TestCase):
def test_blocked_server(self) -> None:
Expand Down
9 changes: 8 additions & 1 deletion tests/federation/transport/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ def test_edu_debugging_doesnt_explode(self) -> None:
"/_matrix/federation/v1/send/txn_id_1234/",
content={
"edus": [
{"edu_type": EduTypes.DEVICE_LIST_UPDATE, "content": {"foo": "bar"}}
{
"edu_type": EduTypes.DEVICE_LIST_UPDATE,
"content": {
"device_id": "QBUAZIFURK",
"stream_id": 0,
"user_id": "@user:id",
},
},
],
"pdus": [],
},
Expand Down
Loading