Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Cross-signing [4/4] -- federation edition #5727

Merged
merged 18 commits into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5727.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add federation support for cross-signing.
8 changes: 4 additions & 4 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,20 +360,20 @@ def _get_device_update_edus(self, limit):
last_device_list = self._last_device_list_stream_id

# Retrieve list of new device updates to send to the destination
now_stream_id, results = yield self._store.get_devices_by_remote(
now_stream_id, results = yield self._store.get_device_updates_by_remote(
self._destination, last_device_list, limit=limit
)
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.device_list_update",
edu_type=edu_type,
content=content,
)
for content in results
for (edu_type, content) in results
]

assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
assert len(edus) <= limit, "get_device_updates_by_remote returned too many EDUs"

return (edus, now_stream_id)

Expand Down
13 changes: 12 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,18 @@ def notify_user_signature_update(self, from_user_id, user_ids):
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
return {"user_id": user_id, "stream_id": stream_id, "devices": devices}
master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
self_signing_key = yield self.store.get_e2e_cross_signing_key(
user_id, "self_signing"
)

return {
"user_id": user_id,
"stream_id": stream_id,
"devices": devices,
"master_key": master_key,
"self_signing_key": self_signing_key,
}

@defer.inlineCallbacks
def user_left_room(self, user, room_id):
Expand Down
137 changes: 128 additions & 9 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
get_verify_key_from_cross_signing_key,
)
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination

logger = logging.getLogger(__name__)
Expand All @@ -49,10 +51,19 @@ def __init__(self, hs):
self.is_mine = hs.is_mine
self.clock = hs.get_clock()

self._edu_updater = SigningKeyEduUpdater(hs, self)

federation_registry = hs.get_federation_registry()

# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
)
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
hs.get_federation_registry().register_query_handler(
federation_registry.register_query_handler(
"client_keys", self.on_federation_query_client_keys
)

Expand Down Expand Up @@ -208,13 +219,15 @@ def do_remote_query(destination):
if user_id in destination_query:
results[user_id] = keys

for user_id, key in remote_result["master_keys"].items():
if user_id in destination_query:
cross_signing_keys["master_keys"][user_id] = key
if "master_keys" in remote_result:
for user_id, key in remote_result["master_keys"].items():
if user_id in destination_query:
cross_signing_keys["master_keys"][user_id] = key

for user_id, key in remote_result["self_signing_keys"].items():
if user_id in destination_query:
cross_signing_keys["self_signing_keys"][user_id] = key
if "self_signing_keys" in remote_result:
for user_id, key in remote_result["self_signing_keys"].items():
if user_id in destination_query:
cross_signing_keys["self_signing_keys"][user_id] = key

except Exception as e:
failure = _exception_to_failure(e)
Expand Down Expand Up @@ -252,7 +265,7 @@ def get_cross_signing_keys_from_cache(self, query, from_user_id):

Returns:
defer.Deferred[dict[str, dict[str, dict]]]: map from
(master|self_signing|user_signing) -> user_id -> key
(master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
"""
master_keys = {}
self_signing_keys = {}
Expand Down Expand Up @@ -344,7 +357,16 @@ def on_federation_query_client_keys(self, query_body):
"""
device_keys_query = query_body.get("device_keys", {})
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}
ret = {"device_keys": res}

# add in the cross-signing keys
cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
device_keys_query, None
)

ret.update(cross_signing_keys)

return ret

@trace
@defer.inlineCallbacks
Expand Down Expand Up @@ -1058,3 +1080,100 @@ class SignatureListItem:
target_user_id = attr.ib()
target_device_id = attr.ib()
signature = attr.ib()


class SigningKeyEduUpdater(object):
"""Handles incoming signing key updates from federation and updates the DB"""

def __init__(self, hs, e2e_keys_handler):
self.store = hs.get_datastore()
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.e2e_keys_handler = e2e_keys_handler

self._remote_edu_linearizer = Linearizer(name="remote_signing_key")

# user_id -> list of updates waiting to be handled.
self._pending_updates = {}

# Recently seen stream ids. We don't bother keeping these in the DB,
# but they're useful to have them about to reduce the number of spurious
# resyncs.
self._seen_updates = ExpiringCache(
cache_name="signing_key_update_edu",
clock=self.clock,
max_len=10000,
expiry_ms=30 * 60 * 1000,
iterable=True,
)

@defer.inlineCallbacks
def incoming_signing_key_update(self, origin, edu_content):
"""Called on incoming signing key update from federation. Responsible for
parsing the EDU and adding to pending updates list.

Args:
origin (string): the server that sent the EDU
edu_content (dict): the contents of the EDU
"""

user_id = edu_content.pop("user_id")
master_key = edu_content.pop("master_key", None)
self_signing_key = edu_content.pop("self_signing_key", None)

if get_domain_from_id(user_id) != origin:
logger.warning("Got signing key update edu for %r from %r", user_id, origin)
return

room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
return

self._pending_updates.setdefault(user_id, []).append(
(master_key, self_signing_key)
)

yield self._handle_signing_key_updates(user_id)

@defer.inlineCallbacks
def _handle_signing_key_updates(self, user_id):
"""Actually handle pending updates.

Args:
user_id (string): the user whose updates we are processing
"""

device_handler = self.e2e_keys_handler.device_handler

with (yield self._remote_edu_linearizer.queue(user_id)):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
return

device_ids = []

logger.info("pending updates: %r", pending_updates)

for master_key, self_signing_key in pending_updates:
if master_key:
yield self.store.set_e2e_cross_signing_key(
user_id, "master", master_key
)
_, verify_key = get_verify_key_from_cross_signing_key(master_key)
# verify_key is a VerifyKey from signedjson, which uses
# .version to denote the portion of the key ID after the
# algorithm and colon, which is the device ID
device_ids.append(verify_key.version)
if self_signing_key:
yield self.store.set_e2e_cross_signing_key(
user_id, "self_signing", self_signing_key
)
_, verify_key = get_verify_key_from_cross_signing_key(
self_signing_key
)
device_ids.append(verify_key.version)

yield device_handler.notify_device_update(user_id, device_ids)
Loading