Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Track when the pulled event signature fails (#13815)
Browse files Browse the repository at this point in the history
Because we're doing the recording in `_check_sigs_and_hash_for_pulled_events_and_fetch` (previously named `_check_sigs_and_hash_and_fetch`), this means we will track signature failures for `backfill`, `get_room_state`, `get_event_auth`, and `get_missing_events` (all pulled event scenarios). And we also record signature failures from `get_pdu`.

Part of #13700

Part of #13676 and #13356

This PR will be especially important for #13816 so we can avoid the costly `_get_state_ids_after_missing_prev_event` down the line when `/messages` calls backfill.
  • Loading branch information
MadLittleMods committed Oct 3, 2022
1 parent 92ae90a commit 70a4317
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/13815.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Keep track when an event pulled over federation fails its signature check so we can intelligently back-off in the future.
25 changes: 22 additions & 3 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Awaitable, Callable, Optional

from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
Expand Down Expand Up @@ -58,7 +58,12 @@ def __init__(self, hs: "HomeServer"):

@trace
async def _check_sigs_and_hash(
self, room_version: RoomVersion, pdu: EventBase
self,
room_version: RoomVersion,
pdu: EventBase,
record_failure_callback: Optional[
Callable[[EventBase, str], Awaitable[None]]
] = None,
) -> EventBase:
"""Checks that event is correctly signed by the sending server.
Expand All @@ -70,6 +75,11 @@ async def _check_sigs_and_hash(
Args:
room_version: The room version of the PDU
pdu: the event to be checked
record_failure_callback: A callback to run whenever the given event
fails signature or hash checks. This includes exceptions
that would be normally be thrown/raised but also things like
checking for event tampering where we just return the redacted
event.
Returns:
* the original event if the checks pass
Expand All @@ -80,7 +90,12 @@ async def _check_sigs_and_hash(
InvalidEventSignatureError if the signature check failed. Nothing
will be logged in this case.
"""
await _check_sigs_on_pdu(self.keyring, room_version, pdu)
try:
await _check_sigs_on_pdu(self.keyring, room_version, pdu)
except InvalidEventSignatureError as exc:
if record_failure_callback:
await record_failure_callback(pdu, str(exc))
raise exc

if not check_event_content_hash(pdu):
# let's try to distinguish between failures because the event was
Expand Down Expand Up @@ -116,6 +131,10 @@ async def _check_sigs_and_hash(
"event_id": pdu.event_id,
}
)
if record_failure_callback:
await record_failure_callback(
pdu, "Event content has been tampered with"
)
return redacted_event

spam_check = await self.spam_checker.check_event_for_spam(pdu)
Expand Down
50 changes: 40 additions & 10 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async def backfill(
pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]

# Check signatures and hash of pdus, removing any from the list that fail checks
pdus[:] = await self._check_sigs_and_hash_and_fetch(
pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
dest, pdus, room_version=room_version
)

Expand Down Expand Up @@ -328,7 +328,17 @@ async def get_pdu_from_destination_raw(

# Check signatures are correct.
try:
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)

async def _record_failure_callback(
event: EventBase, cause: str
) -> None:
await self.store.record_event_failed_pull_attempt(
event.room_id, event.event_id, cause
)

signed_pdu = await self._check_sigs_and_hash(
room_version, pdu, _record_failure_callback
)
except InvalidEventSignatureError as e:
errmsg = f"event id {pdu.event_id}: {e}"
logger.warning("%s", errmsg)
Expand Down Expand Up @@ -547,24 +557,28 @@ async def get_room_state(
len(auth_event_map),
)

valid_auth_events = await self._check_sigs_and_hash_and_fetch(
valid_auth_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_event_map.values(), room_version
)

valid_state_events = await self._check_sigs_and_hash_and_fetch(
destination, state_event_map.values(), room_version
valid_state_events = (
await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, state_event_map.values(), room_version
)
)

return valid_state_events, valid_auth_events

@trace
async def _check_sigs_and_hash_and_fetch(
async def _check_sigs_and_hash_for_pulled_events_and_fetch(
self,
origin: str,
pdus: Collection[EventBase],
room_version: RoomVersion,
) -> List[EventBase]:
"""Checks the signatures and hashes of a list of events.
"""
Checks the signatures and hashes of a list of pulled events we got from
federation and records any signature failures as failed pull attempts.
If a PDU fails its signature check then we check if we have it in
the database, and if not then request it from the sender's server (if that
Expand Down Expand Up @@ -597,11 +611,17 @@ async def _check_sigs_and_hash_and_fetch(

valid_pdus: List[EventBase] = []

async def _record_failure_callback(event: EventBase, cause: str) -> None:
await self.store.record_event_failed_pull_attempt(
event.room_id, event.event_id, cause
)

async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu,
origin=origin,
room_version=room_version,
record_failure_callback=_record_failure_callback,
)

if valid_pdu:
Expand All @@ -618,6 +638,9 @@ async def _check_sigs_and_hash_and_fetch_one(
pdu: EventBase,
origin: str,
room_version: RoomVersion,
record_failure_callback: Optional[
Callable[[EventBase, str], Awaitable[None]]
] = None,
) -> Optional[EventBase]:
"""Takes a PDU and checks its signatures and hashes.
Expand All @@ -634,14 +657,21 @@ async def _check_sigs_and_hash_and_fetch_one(
origin
pdu
room_version
record_failure_callback: A callback to run whenever the given event
fails signature or hash checks. This includes exceptions
that would be normally be thrown/raised but also things like
checking for event tampering where we just return the redacted
event.
Returns:
The PDU (possibly redacted) if it has valid signatures and hashes.
None if no valid copy could be found.
"""

try:
return await self._check_sigs_and_hash(room_version, pdu)
return await self._check_sigs_and_hash(
room_version, pdu, record_failure_callback
)
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
Expand Down Expand Up @@ -694,7 +724,7 @@ async def get_event_auth(

auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]

signed_auth = await self._check_sigs_and_hash_and_fetch(
signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_chain, room_version=room_version
)

Expand Down Expand Up @@ -1401,7 +1431,7 @@ async def get_missing_events(
event_from_pdu_json(e, room_version) for e in content.get("events", [])
]

signed_events = await self._check_sigs_and_hash_and_fetch(
signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, events, room_version=room_version
)
except HttpResponseException as e:
Expand Down
75 changes: 75 additions & 0 deletions tests/federation/test_federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@

from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock

from tests.test_utils import event_injection
from tests.unittest import FederatingHomeserverTestCase


class FederationClientTest(FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer):
super().prepare(reactor, clock, homeserver)

Expand Down Expand Up @@ -231,6 +240,72 @@ def _get_pdu_once(self) -> EventBase:

return remote_pdu

def test_backfill_invalid_signature_records_failed_pull_attempts(
self,
) -> None:
"""
Test to make sure that events from /backfill with invalid signatures get
recorded as failed pull attempts.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)

# We purposely don't run `add_hashes_and_signatures_from_other_server`
# over this because we want the signature check to fail.
pulled_event, _ = self.get_success(
event_injection.create_event(
self.hs,
room_id=room_id,
sender=OTHER_USER,
type="test_event_type",
content={"body": "garply"},
)
)

# We expect an outbound request to /backfill, so stub that out
self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed(
_mock_response(
{
"origin": "yet.another.server",
"origin_server_ts": 900,
# Mimic the other server returning our new `pulled_event`
"pdus": [pulled_event.get_pdu_json()],
}
)
)

self.get_success(
self.hs.get_federation_client().backfill(
# We use "yet.another.server" instead of
# `self.OTHER_SERVER_NAME` because we want to see the behavior
# from `_check_sigs_and_hash_and_fetch_one` where it tries to
# fetch the PDU again from the origin server if the signature
# fails. Just want to make sure that the failure is counted from
# both code paths.
dest="yet.another.server",
room_id=room_id,
limit=1,
extremities=[pulled_event.event_id],
),
)

# Make sure our failed pull attempt was recorded
backfill_num_attempts = self.get_success(
main_store.db_pool.simple_select_one_onecol(
table="event_failed_pull_attempts",
keyvalues={"event_id": pulled_event.event_id},
retcol="num_attempts",
)
)
# This is 2 because it failed once from `self.OTHER_SERVER_NAME` and the
# other from "yet.another.server"
self.assertEqual(backfill_num_attempts, 2)


def _mock_response(resp: JsonDict):
body = json.dumps(resp).encode("utf-8")
Expand Down
4 changes: 2 additions & 2 deletions tests/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ async def _check_event_auth(origin, event, context):

federation_event_handler._check_event_auth = _check_event_auth
self.client = self.homeserver.get_federation_client()
self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed(
pdus
self.client._check_sigs_and_hash_for_pulled_events_and_fetch = (
lambda dest, pdus, **k: succeed(pdus)
)

# Send the join, it should return None (which is not an error)
Expand Down

0 comments on commit 70a4317

Please sign in to comment.