Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Federation outbound proxy #15773

Merged
merged 47 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
eb6e132
Proxy federation requests
erikjohnston Mar 31, 2023
6a95e7a
Make configurable
erikjohnston Apr 28, 2023
f0270aa
Cache the fed proxy
erikjohnston May 10, 2023
6d98582
Accept a list of federation proxies
erikjohnston May 10, 2023
5889396
Make configurable
erikjohnston May 10, 2023
58fe4da
Comment
erikjohnston May 10, 2023
f00fedd
Remove unused class
erikjohnston May 15, 2023
41c5747
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 13, 2023
f219f0e
Add changelog
MadLittleMods Jun 13, 2023
c998d28
Avoid negated condition
MadLittleMods Jun 13, 2023
cc05c97
Fix tests and align to new `matrix-federation://` schema
MadLittleMods Jun 13, 2023
8cfad3d
Fix lints
MadLittleMods Jun 13, 2023
9eec614
WORKER PROXY WIP
erikjohnston May 10, 2023
e9e900f
Align scheme checking
MadLittleMods Jun 14, 2023
dcb4105
Fix lints
MadLittleMods Jun 14, 2023
c6dcd5e
Refactor tests to use `get_clock()`
MadLittleMods Jun 14, 2023
f139898
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 14, 2023
8f9f478
Fix tests (make sure `federation_http_client` is defined)
MadLittleMods Jun 14, 2023
e789c64
Fix tests
MadLittleMods Jun 14, 2023
0cead40
Fix lints
MadLittleMods Jun 14, 2023
11bf041
Maybe fix more replication tests
MadLittleMods Jun 14, 2023
d847564
Mark out spots to add docs
MadLittleMods Jun 15, 2023
74988e2
WIP: Very rough worker test
MadLittleMods Jun 16, 2023
6b44e66
Cleaned up test
MadLittleMods Jun 16, 2023
8af2fb8
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 20, 2023
1abd3b1
Clean up test
MadLittleMods Jun 20, 2023
477844c
Explain why we care about catching `PotentialDataLoss`
MadLittleMods Jun 20, 2023
dac5532
Add some more context
MadLittleMods Jun 20, 2023
cf208d2
Test error case
MadLittleMods Jun 20, 2023
e665fa8
Flesh out docstrings and comments
MadLittleMods Jun 20, 2023
2ce2025
Update docs
MadLittleMods Jun 20, 2023
632544a
Add some background behind `matrix-federation://`
MadLittleMods Jun 20, 2023
033e18a
Align language
MadLittleMods Jun 20, 2023
b5e916e
Revert back to debug level
MadLittleMods Jun 20, 2023
484680f
Merge branch 'develop' into erikj/fed_proxy
erikjohnston Jun 20, 2023
2032ea6
`master`/`main` is in the `instance_map` so no need to skip checking …
MadLittleMods Jun 21, 2023
926e3e0
Remove extra proxy logging
MadLittleMods Jun 21, 2023
0a2a9cf
Do not copy over hop-by-hop headers
MadLittleMods Jun 21, 2023
c757a38
Add tests for `parse_connection_header_value`
MadLittleMods Jun 21, 2023
be12f21
Add tests to make sure headers are removed
MadLittleMods Jun 21, 2023
735203e
Ignore lint
MadLittleMods Jun 21, 2023
9e3881f
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 21, 2023
c1ec014
Fix `arg-type` lint
MadLittleMods Jun 21, 2023
d400b50
Simplify `parse_connection_header_value`
MadLittleMods Jun 21, 2023
e99a5e9
Use safe `json.dumps` for JSON response
MadLittleMods Jun 27, 2023
074fe0c
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 28, 2023
d3292d2
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15773.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion contrib/lnav/synapse-log-format.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"level": "error"
},
{
"line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')",
"line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix-federation://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"level": "warning"
},
{
Expand Down
4 changes: 2 additions & 2 deletions scripts-dev/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def request(
authorization_headers.append(header)
print("Authorization: %s" % header, file=sys.stderr)

dest = "matrix://%s%s" % (destination, path)
dest = "matrix-federation://%s%s" % (destination, path)
print("Requesting %s" % dest, file=sys.stderr)

s = requests.Session()
s.mount("matrix://", MatrixConnectionAdapter())
s.mount("matrix-federation://", MatrixConnectionAdapter())

headers: Dict[str, str] = {
"Authorization": authorization_headers[0],
Expand Down
2 changes: 2 additions & 0 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def listen_unix(


def listen_http(
hs: "HomeServer",
listener_config: ListenerConfig,
root_resource: Resource,
version_string: str,
Expand All @@ -406,6 +407,7 @@ def listen_http(
version_string,
max_request_body_size=max_request_body_size,
reactor=reactor,
federation_agent=hs.get_federation_http_client().agent,
)

if isinstance(listener_config, TCPListenerConfig):
Expand Down
1 change: 1 addition & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
root_resource = create_resource_tree(resources, OptionsResource())

_base.listen_http(
self,
listener_config,
root_resource,
self.version_string,
Expand Down
1 change: 1 addition & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _listener_http(
root_resource = OptionsResource()

ports = listen_http(
self,
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
Expand Down
36 changes: 35 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import argparse
import logging
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

import attr
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
Expand Down Expand Up @@ -148,6 +148,23 @@ class WriterLocations:
)


@attr.s(auto_attribs=True)
class OutboundFederationRestrictedTo:
"""Whether we limit outbound federation to a certain set of instances.

Attributes:
instances: optional list of instances that can make outbound federation
requests. If None then all instances can make federation requests.
locations: list of instance locations to connect to proxy via.
"""

instances: Optional[List[str]]
locations: List[InstanceLocationConfig] = attr.Factory(list)

def __contains__(self, instance: str) -> bool:
return self.instances is None or instance in self.instances


class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
Expand Down Expand Up @@ -357,6 +374,23 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
new_option_name="update_user_directory_from_worker",
)

outbound_federation_restricted_to = config.get(
"outbound_federation_restricted_to", None
)
self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
outbound_federation_restricted_to
)
if outbound_federation_restricted_to:
for instance in outbound_federation_restricted_to:
if instance != "master" and instance not in self.instance_map:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
raise ConfigError(
"Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
% (instance,)
)
self.outbound_federation_restricted_to.locations.append(
self.instance_map[instance]
)

def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
Expand Down
10 changes: 5 additions & 5 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
@implementer(IAgent)
class MatrixFederationAgent:
"""An Agent-like thing which provides a `request` method which correctly
handles resolving matrix server names when using matrix://. Handles standard
handles resolving matrix server names when using matrix-federation://. Handles standard
https URIs as normal.

Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
Expand Down Expand Up @@ -167,14 +167,14 @@ def request(
# There must be a valid hostname.
assert parsed_uri.hostname

# If this is a matrix:// URI check if the server has delegated matrix
# If this is a matrix-federation:// URI check if the server has delegated matrix
# traffic using well-known delegation.
#
# We have to do this here and not in the endpoint as we need to rewrite
# the host header with the delegated server name.
delegated_server = None
if (
parsed_uri.scheme == b"matrix"
parsed_uri.scheme == b"matrix-federation"
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
Expand Down Expand Up @@ -250,7 +250,7 @@ def endpointForURI(self, parsed_uri: URI) -> "MatrixHostnameEndpoint":

@implementer(IStreamClientEndpoint)
class MatrixHostnameEndpoint:
"""An endpoint that resolves matrix:// URLs using Matrix server name
"""An endpoint that resolves matrix-federation:// URLs using Matrix server name
resolution (i.e. via SRV). Does not check for well-known delegation.

Args:
Expand Down Expand Up @@ -379,7 +379,7 @@ async def _resolve_server(self) -> List[Server]:
connect to.
"""

if self._parsed_uri.scheme != b"matrix":
if self._parsed_uri.scheme != b"matrix-federation":
return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]

# Note: We don't do well-known lookup as that needs to have happened
Expand Down
38 changes: 29 additions & 9 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer, IResponse
from twisted.web.iweb import IAgent, IBodyProducer, IResponse

import synapse.metrics
import synapse.util.retryutils
Expand All @@ -72,6 +72,7 @@
read_body_with_max_size,
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -172,7 +173,14 @@ def __attrs_post_init__(self) -> None:

# The object is frozen so we can pre-compute this.
uri = urllib.parse.urlunparse(
(b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
(
b"matrix-federation",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
destination_bytes,
path_bytes,
None,
query_bytes,
b"",
)
)
object.__setattr__(self, "uri", uri)

Expand Down Expand Up @@ -386,17 +394,29 @@ def __init__(
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)

federation_agent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
outbound_federation_restricted_to = (
hs.config.worker.outbound_federation_restricted_to
)
if hs.get_instance_name() in outbound_federation_restricted_to:
federation_agent: IAgent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
)
else:
federation_proxies = outbound_federation_restricted_to.locations
federation_agent = ProxyAgent(
self.reactor,
self.reactor,
tls_client_options_factory,
federation_proxies=federation_proxies,
)

# Use a BlocklistingAgentWrapper to prevent circumventing the IP
# blocking via IP literals in server names
self.agent = BlocklistingAgentWrapper(
self.agent: IAgent = BlocklistingAgentWrapper(
federation_agent,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
)
Expand Down
151 changes: 151 additions & 0 deletions synapse/http/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Optional, Tuple, cast

from twisted.internet import protocol
from twisted.internet.interfaces import ITCPTransport
from twisted.internet.protocol import connectionDone
from twisted.python import failure
from twisted.python.failure import Failure
from twisted.web.client import ResponseDone
from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IResponse
from twisted.web.resource import IResource
from twisted.web.server import Site

from synapse.http import QuieterFileBodyProducer
from synapse.http.server import _AsyncResource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util.async_helpers import timeout_deferred

if TYPE_CHECKING:
from synapse.http.site import SynapseRequest

logger = logging.getLogger(__name__)


class ProxyResource(_AsyncResource):
isLeaf = True

def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent):
super().__init__(True)

self.reactor = reactor
self.agent = federation_agent

async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
uri = urllib.parse.urlparse(request.uri)
assert uri.scheme == b"matrix-federation"

logger.info("Got proxy request %s", request.uri)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

headers = Headers()
for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
header_value = request.getHeader(header_name)
if header_value:
headers.addRawHeader(header_name, header_value)

request_deferred = run_in_background(
self.agent.request,
request.method,
request.uri,
headers=headers,
bodyProducer=QuieterFileBodyProducer(request.content),
)
request_deferred = timeout_deferred(
request_deferred,
timeout=90,
reactor=self.reactor,
)

response = await make_deferred_yieldable(request_deferred)

logger.info("Got proxy response %s", response.code)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

return response.code, response

def _send_response(
self,
request: "SynapseRequest",
code: int,
response_object: Any,
) -> None:
response = cast(IResponse, response_object)

request.setResponseCode(code)

# Copy headers.
for k, v in response.headers.getAllRawHeaders():
request.responseHeaders.setRawHeaders(k, v)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

response.deliverBody(_ProxyResponseBody(request))

def _send_error_response(
self,
f: failure.Failure,
request: "SynapseRequest",
) -> None:
request.setResponseCode(502)
request.finish()


class _ProxyResponseBody(protocol.Protocol):
transport: Optional[ITCPTransport] = None

def __init__(self, request: "SynapseRequest") -> None:
self._request = request

def dataReceived(self, data: bytes) -> None:
if self._request._disconnected and self.transport is not None:
self.transport.abortConnection()
return

self._request.write(data)

def connectionLost(self, reason: Failure = connectionDone) -> None:
if self._request.finished:
return

if reason.check(ResponseDone):
self._request.finish()
elif reason.check(PotentialDataLoss):
# TODO: ARGH
self._request.finish()
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to worry about this here. Does Synapse ever omit the Content-Length or Transfer-Encoding headers?

# This applies to requests which don't set `Content-Length` or a
# `Transfer-Encoding` in the response because in this case the end of the
# response is indicated by the connection being closed, an event which may
# also be due to a transient network problem or other error. But since this
# behavior is expected of some servers (like YouTube), let's ignore it.
# Stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840

I've added the context for why we handle this scenario in synapse/http/client.py where it was originally used because some URL previews contact sites like YouTube which omit those headers sometimes.

else:
self._request.transport.abortConnection()


class ProxySite(Site):
def __init__(
self,
resource: IResource,
reactor: ISynapseReactor,
federation_agent: IAgent,
):
super().__init__(resource, reactor=reactor)

self._proxy_resource = ProxyResource(reactor, federation_agent)

def getResourceFor(self, request: "SynapseRequest") -> IResource:
uri = urllib.parse.urlparse(request.uri)
if uri.scheme == b"matrix-federation":
return self._proxy_resource

return super().getResourceFor(request)
Loading