Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Implementation of MSC2314 (#6176)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl authored Nov 27, 2019
1 parent 0d27aba commit 0f87b91
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 65 deletions.
1 change: 1 addition & 0 deletions changelog.d/6176.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314.
26 changes: 17 additions & 9 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(self, hs):

self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()

self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
Expand Down Expand Up @@ -264,9 +266,6 @@ async def received_edu(self, origin, edu_type, content):
await self.registry.on_edu(edu_type, origin, content)

async def on_context_state_request(self, origin, room_id, event_id):
if not event_id:
raise NotImplementedError("Specify an event")

origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

Expand All @@ -280,13 +279,18 @@ async def on_context_state_request(self, origin, room_id, event_id):
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (await self._server_linearizer.queue((origin, room_id))):
resp = await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
resp = dict(
await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
)
)

room_version = await self.store.get_room_version(room_id)
resp["room_version"] = room_version

return 200, resp

async def on_state_ids_request(self, origin, room_id, event_id):
Expand All @@ -306,7 +310,11 @@ async def on_state_ids_request(self, origin, room_id, event_id):
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}

async def _on_context_state_request_compute(self, room_id, event_id):
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
if event_id:
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
else:
pdus = (await self.state.get_current_state(room_id)).values()

auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])

return {
Expand Down
6 changes: 3 additions & 3 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,15 +421,15 @@ async def on_GET(self, origin, content, query, event_id):
return await self.handler.on_pdu_request(origin, event_id)


class FederationStateServlet(BaseFederationServlet):
class FederationStateV1Servlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/?"

# This is when someone asks for all data for a given context.
async def on_GET(self, origin, content, query, context):
return await self.handler.on_context_state_request(
origin,
context,
parse_string_from_args(query, "event_id", None, required=True),
parse_string_from_args(query, "event_id", None, required=False),
)


Expand Down Expand Up @@ -1360,7 +1360,7 @@ async def on_GET(self, origin, content, query, room_id):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationEventServlet,
FederationStateServlet,
FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationQueryServlet,
Expand Down
6 changes: 5 additions & 1 deletion sytest-blacklist
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file serves as a blacklist for SyTest tests that we expect will fail in
# Synapse.
#
#
# Each line of this file is scanned by sytest during a run and if the line
# exactly matches the name of a test, it will be marked as "expected fail",
# meaning the test will still run, but failure will not mark the entire test
Expand Down Expand Up @@ -29,3 +29,7 @@ Enabling an unknown default rule fails with 404

# Blacklisted due to https://github.com/matrix-org/synapse/issues/1663
New federated private chats get full presence information (SYN-115)

# Blacklisted due to https://github.com/matrix-org/matrix-doc/pull/2314 removing
# this requirement from the spec
Inbound federation of state requires event_id as a mandatory paramater
28 changes: 3 additions & 25 deletions tests/federation/test_complexity.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
from twisted.internet import defer

from synapse.api.errors import Codes, SynapseError
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.types import UserID
from synapse.util.ratelimitutils import FederationRateLimiter

from tests import unittest


class RoomComplexityTests(unittest.HomeserverTestCase):
class RoomComplexityTests(unittest.FederatingHomeserverTestCase):

servlets = [
admin.register_servlets,
Expand All @@ -41,25 +38,6 @@ def default_config(self, name="test"):
config["limit_remote_rooms"] = {"enabled": True, "complexity": 0.05}
return config

def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return defer.succeed("otherserver.nottld")

ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)

def test_complexity_simple(self):

u1 = self.register_user("u1", "pass")
Expand Down Expand Up @@ -105,7 +83,7 @@ def test_join_too_large(self):

d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
"roomid",
UserID.from_string(u1),
{"membership": "join"},
Expand Down Expand Up @@ -146,7 +124,7 @@ def test_join_too_large_once_joined(self):

d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
room_1,
UserID.from_string(u1),
{"membership": "join"},
Expand Down
4 changes: 3 additions & 1 deletion tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from synapse.types import ReadReceipt

from tests.unittest import HomeserverTestCase
from tests.unittest import HomeserverTestCase, override_config


class FederationSenderTestCases(HomeserverTestCase):
Expand All @@ -29,6 +29,7 @@ def make_homeserver(self, reactor, clock):
federation_transport_client=Mock(spec=["send_transaction"]),
)

@override_config({"send_federation": True})
def test_send_receipts(self):
mock_state_handler = self.hs.get_state_handler()
mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
Expand Down Expand Up @@ -69,6 +70,7 @@ def test_send_receipts(self):
],
)

@override_config({"send_federation": True})
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
only after 20ms"""
Expand Down
63 changes: 63 additions & 0 deletions tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,6 +17,8 @@

from synapse.events import FrozenEvent
from synapse.federation.federation_server import server_matches_acl_event
from synapse.rest import admin
from synapse.rest.client.v1 import login, room

from tests import unittest

Expand All @@ -41,6 +44,66 @@ def test_block_ip_literals(self):
self.assertTrue(server_matches_acl_event("1:2:3:4", e))


class StateQueryTests(unittest.FederatingHomeserverTestCase):

servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def test_without_event_id(self):
"""
Querying v1/state/<room_id> without an event ID will return the current
known state.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")

room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.inject_room_member(room_1, "@user:other.example.com", "join")

request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)

self.assertEqual(
channel.json_body["room_version"],
self.hs.config.default_room_version.identifier,
)

members = set(
map(
lambda x: x["state_key"],
filter(
lambda x: x["type"] == "m.room.member", channel.json_body["pdus"]
),
)
)

self.assertEqual(members, set(["@user:other.example.com", u1]))
self.assertEqual(len(channel.json_body["pdus"]), 6)

def test_needs_to_be_in_room(self):
"""
Querying v1/state/<room_id> requires the server
be in the room to provide data.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")

room_1 = self.helper.create_room_as(u1, tok=u1_token)

request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(403, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")


def _create_acl_event(content):
return FrozenEvent(
{
Expand Down
3 changes: 3 additions & 0 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.types import UserID

from tests import unittest
from tests.unittest import override_config
from tests.utils import register_federation_servlets

# Some local users to test with
Expand Down Expand Up @@ -174,6 +175,7 @@ def test_started_typing_local(self):
],
)

@override_config({"send_federation": True})
def test_started_typing_remote_send(self):
self.room_members = [U_APPLE, U_ONION]

Expand Down Expand Up @@ -237,6 +239,7 @@ def test_started_typing_remote_recv(self):
],
)

@override_config({"send_federation": True})
def test_stopped_typing(self):
self.room_members = [U_APPLE, U_BANANA, U_ONION]

Expand Down
3 changes: 3 additions & 0 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ def prepare(self, reactor, clock, hs):
server_factory = ReplicationStreamProtocolFactory(self.hs)
self.streamer = server_factory.streamer

handler_factory = Mock()
self.replication_handler = ReplicationClientHandler(self.slaved_store)
self.replication_handler.factory = handler_factory

client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
Expand Down
4 changes: 4 additions & 0 deletions tests/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock

from synapse.replication.tcp.commands import ReplicateCommand
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
Expand All @@ -30,7 +32,9 @@ def prepare(self, reactor, clock, hs):
server = server_factory.buildProtocol(None)

# build a replication client, with a dummy handler
handler_factory = Mock()
self.test_handler = TestReplicationClientHandler()
self.test_handler.factory = handler_factory
self.client = ClientReplicationStreamProtocol(
"client", "test", clock, self.test_handler
)
Expand Down
26 changes: 1 addition & 25 deletions tests/storage/test_roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

from unittest.mock import Mock

from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.api.constants import Membership
from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client.v1 import login, room
from synapse.types import Requester, UserID
Expand All @@ -44,9 +43,6 @@ def prepare(self, reactor, clock, hs):
# We can't test the RoomMemberStore on its own without the other event
# storage logic
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.event_builder_factory = hs.get_event_builder_factory()
self.event_creation_handler = hs.get_event_creation_handler()

self.u_alice = self.register_user("alice", "pass")
self.t_alice = self.login("alice", "pass")
Expand All @@ -55,26 +51,6 @@ def prepare(self, reactor, clock, hs):
# User elsewhere on another host
self.u_charlie = UserID.from_string("@charlie:elsewhere")

def inject_room_member(self, room, user, membership, replaces_state=None):
builder = self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Member,
"sender": user,
"state_key": user,
"room_id": room,
"content": {"membership": membership},
},
)

event, context = self.get_success(
self.event_creation_handler.create_new_client_event(builder)
)

self.get_success(self.storage.persistence.persist_event(event, context))

return event

def test_one_member(self):

# Alice creates the room, and is automatically joined
Expand Down
Loading

0 comments on commit 0f87b91

Please sign in to comment.