Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Pass room_version into event_from_pdu_json #6856

Merged
merged 1 commit into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6856.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactoring work in preparation for changing the event redaction algorithm.
28 changes: 16 additions & 12 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,9 +23,13 @@

from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
)
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import event_type_from_format_version
from synapse.events import EventBase, event_type_from_format_version
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
Expand All @@ -33,7 +38,7 @@
make_deferred_yieldable,
preserve_fn,
)
from synapse.types import get_domain_from_id
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -342,16 +347,15 @@ def _is_invite_via_3pid(event):
)


def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
"""Construct a FrozenEvent from an event json received over federation
def event_from_pdu_json(
pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False
) -> EventBase:
"""Construct an EventBase from an event json received over federation

Args:
pdu_json (object): pdu as received over federation
event_format_version (int): The event format version
outlier (bool): True to mark this event as an outlier

Returns:
FrozenEvent
pdu_json: pdu as received over federation
room_version: The version of the room this event belongs to
outlier: True to mark this event as an outlier

Raises:
SynapseError: if the pdu is missing required fields or is otherwise
Expand All @@ -370,7 +374,7 @@ def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)

event = event_type_from_format_version(event_format_version)(pdu_json)
event = event_type_from_format_version(room_version.event_format)(pdu_json)

event.internal_metadata.outlier = outlier

Expand Down
35 changes: 16 additions & 19 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
RoomVersion,
RoomVersions,
)
from synapse.events import EventBase, builder, room_version_to_event_format
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.utils import log_function
Expand Down Expand Up @@ -209,18 +209,18 @@ async def backfill(

logger.debug("backfill transaction_data=%r", transaction_data)

room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)

pdus = [
event_from_pdu_json(p, format_ver, outlier=False)
event_from_pdu_json(p, room_version, outlier=False)
for p in transaction_data["pdus"]
]

# FIXME: We should handle signature failures more gracefully.
pdus[:] = await make_deferred_yieldable(
defer.gatherResults(
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
self._check_sigs_and_hashes(room_version.identifier, pdus),
consumeErrors=True,
).addErrback(unwrapFirstError)
)

Expand Down Expand Up @@ -262,8 +262,6 @@ async def get_pdu(

pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

format_ver = room_version.event_format

signed_pdu = None
for destination in destinations:
now = self._clock.time_msec()
Expand All @@ -284,7 +282,7 @@ async def get_pdu(
)

pdu_list = [
event_from_pdu_json(p, format_ver, outlier=outlier)
event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
]

Expand Down Expand Up @@ -350,15 +348,15 @@ async def get_room_state_ids(
async def get_event_auth(self, destination, room_id, event_id):
res = await self.transport_layer.get_event_auth(destination, room_id, event_id)

room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)

auth_chain = [
event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"]
event_from_pdu_json(p, room_version, outlier=True)
for p in res["auth_chain"]
]

signed_auth = await self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True, room_version=room_version
destination, auth_chain, outlier=True, room_version=room_version.identifier
)

signed_auth.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -547,12 +545,12 @@ async def send_request(destination) -> Dict[str, Any]:
logger.debug("Got content: %s", content)

state = [
event_from_pdu_json(p, room_version.event_format, outlier=True)
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("state", [])
]

auth_chain = [
event_from_pdu_json(p, room_version.event_format, outlier=True)
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("auth_chain", [])
]

Expand Down Expand Up @@ -677,7 +675,7 @@ async def send_invite(

logger.debug("Got response to send_invite: %s", pdu_dict)

pdu = event_from_pdu_json(pdu_dict, room_version.event_format)
pdu = event_from_pdu_json(pdu_dict, room_version)

# Check signatures are correct.
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
Expand Down Expand Up @@ -865,15 +863,14 @@ async def get_missing_events(
timeout=timeout,
)

room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)

events = [
event_from_pdu_json(e, format_ver) for e in content.get("events", [])
event_from_pdu_json(e, room_version) for e in content.get("events", [])
]

signed_events = await self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False, room_version=room_version
destination, events, outlier=False, room_version=room_version.identifier
)
except HttpResponseException as e:
if not e.code == 400:
Expand Down
41 changes: 14 additions & 27 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
Expand Down Expand Up @@ -234,24 +233,17 @@ async def _handle_pdus_in_txn(
continue

try:
room_version = await self.store.get_room_version_id(room_id)
room_version = await self.store.get_room_version(room_id)
except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue

try:
format_ver = room_version_to_event_format(room_version)
except UnsupportedRoomVersionError:
except UnsupportedRoomVersionError as e:
# this can happen if support for a given room version is withdrawn,
# so that we still get events for said room.
logger.info(
"Ignoring PDU for room %s with unknown version %s",
room_id,
room_version,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're logging it seems useful to include why we're ignoring an event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but str(e) will tell us that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it won't log the room_id or the room version? (UnsupportedRoomVersionError doesn't include those things)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, ok then. (The uses I have seen don't specify a custom message)

logger.info("Ignoring PDU: %s", e)
continue

event = event_from_pdu_json(p, format_ver)
event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)

pdu_results = {}
Expand Down Expand Up @@ -407,9 +399,7 @@ async def on_invite_request(
Codes.UNSUPPORTED_ROOM_VERSION,
)

format_ver = room_version.event_format

pdu = event_from_pdu_json(content, format_ver)
pdu = event_from_pdu_json(content, room_version)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
Expand All @@ -420,16 +410,15 @@ async def on_invite_request(
async def on_send_join_request(self, origin, content, room_id):
logger.debug("on_send_join_request: content: %s", content)

room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
room_version = await self.store.get_room_version(room_id)
pdu = event_from_pdu_json(content, room_version)

origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)

logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)

pdu = await self._check_sigs_and_hash(room_version, pdu)
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)

res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
Expand All @@ -451,16 +440,15 @@ async def on_make_leave_request(self, origin, room_id, user_id):
async def on_send_leave_request(self, origin, content, room_id):
logger.debug("on_send_leave_request: content: %s", content)

room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
room_version = await self.store.get_room_version(room_id)
pdu = event_from_pdu_json(content, room_version)

origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)

logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)

pdu = await self._check_sigs_and_hash(room_version, pdu)
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)

await self.handler.on_send_leave_request(origin, pdu)
return {}
Expand Down Expand Up @@ -498,15 +486,14 @@ async def on_query_auth_request(self, origin, content, room_id, event_id):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
room_version = await self.store.get_room_version(room_id)

auth_chain = [
event_from_pdu_json(e, format_ver) for e in content["auth_chain"]
event_from_pdu_json(e, room_version) for e in content["auth_chain"]
]

signed_auth = await self._check_sigs_and_hash_and_fetch(
origin, auth_chain, outlier=True, room_version=room_version
origin, auth_chain, outlier=True, room_version=room_version.identifier
)

ret = await self.handler.on_query_auth(
Expand Down
6 changes: 4 additions & 2 deletions tests/handlers/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def test_rejected_message_event_state(self):
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(self.store.get_room_version(room_id))

# pretend that another server has joined
join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id)
Expand All @@ -120,7 +121,7 @@ def test_rejected_message_event_state(self):
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
join_event.format_version,
room_version,
)

with LoggingContext(request="send_rejected"):
Expand Down Expand Up @@ -149,6 +150,7 @@ def test_rejected_state_event_state(self):
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(self.store.get_room_version(room_id))

# pretend that another server has joined
join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id)
Expand All @@ -171,7 +173,7 @@ def test_rejected_state_event_state(self):
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
join_event.format_version,
room_version,
)

with LoggingContext(request="send_rejected"):
Expand Down