Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Federation outbound proxy #15773

Merged
merged 47 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
eb6e132
Proxy federation requests
erikjohnston Mar 31, 2023
6a95e7a
Make configurable
erikjohnston Apr 28, 2023
f0270aa
Cache the fed proxy
erikjohnston May 10, 2023
6d98582
Accept a list of federation proxies
erikjohnston May 10, 2023
5889396
Make configurable
erikjohnston May 10, 2023
58fe4da
Comment
erikjohnston May 10, 2023
f00fedd
Remove unused class
erikjohnston May 15, 2023
41c5747
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 13, 2023
f219f0e
Add changelog
MadLittleMods Jun 13, 2023
c998d28
Avoid negated condition
MadLittleMods Jun 13, 2023
cc05c97
Fix tests and align to new `matrix-federation://` schema
MadLittleMods Jun 13, 2023
8cfad3d
Fix lints
MadLittleMods Jun 13, 2023
9eec614
WORKER PROXY WIP
erikjohnston May 10, 2023
e9e900f
Align scheme checking
MadLittleMods Jun 14, 2023
dcb4105
Fix lints
MadLittleMods Jun 14, 2023
c6dcd5e
Refactor tests to use `get_clock()`
MadLittleMods Jun 14, 2023
f139898
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 14, 2023
8f9f478
Fix tests (make sure `federation_http_client` is defined)
MadLittleMods Jun 14, 2023
e789c64
Fix tests
MadLittleMods Jun 14, 2023
0cead40
Fix lints
MadLittleMods Jun 14, 2023
11bf041
Maybe fix more replication tests
MadLittleMods Jun 14, 2023
d847564
Mark out spots to add docs
MadLittleMods Jun 15, 2023
74988e2
WIP: Very rough worker test
MadLittleMods Jun 16, 2023
6b44e66
Cleaned up test
MadLittleMods Jun 16, 2023
8af2fb8
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 20, 2023
1abd3b1
Clean up test
MadLittleMods Jun 20, 2023
477844c
Explain why we care about catching `PotentialDataLoss`
MadLittleMods Jun 20, 2023
dac5532
Add some more context
MadLittleMods Jun 20, 2023
cf208d2
Test error case
MadLittleMods Jun 20, 2023
e665fa8
Flesh out docstrings and comments
MadLittleMods Jun 20, 2023
2ce2025
Update docs
MadLittleMods Jun 20, 2023
632544a
Add some background behind `matrix-federation://`
MadLittleMods Jun 20, 2023
033e18a
Align language
MadLittleMods Jun 20, 2023
b5e916e
Revert back to debug level
MadLittleMods Jun 20, 2023
484680f
Merge branch 'develop' into erikj/fed_proxy
erikjohnston Jun 20, 2023
2032ea6
`master`/`main` is in the `instance_map` so no need to skip checking …
MadLittleMods Jun 21, 2023
926e3e0
Remove extra proxy logging
MadLittleMods Jun 21, 2023
0a2a9cf
Do not copy over hop-by-hop headers
MadLittleMods Jun 21, 2023
c757a38
Add tests for `parse_connection_header_value`
MadLittleMods Jun 21, 2023
be12f21
Add tests to make sure headers are removed
MadLittleMods Jun 21, 2023
735203e
Ignore lint
MadLittleMods Jun 21, 2023
9e3881f
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 21, 2023
c1ec014
Fix `arg-type` lint
MadLittleMods Jun 21, 2023
d400b50
Simplify `parse_connection_header_value`
MadLittleMods Jun 21, 2023
e99a5e9
Use safe `json.dumps` for JSON response
MadLittleMods Jun 27, 2023
074fe0c
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jun 28, 2023
d3292d2
Merge branch 'develop' into erikj/fed_proxy
MadLittleMods Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15773.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 24 additions & 7 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3930,13 +3930,14 @@ federation_sender_instances:
---
### `instance_map`

When using workers this should be a map from [`worker_name`](#worker_name) to the
HTTP replication listener of the worker, if configured, and to the main process.
Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
a HTTP replication listener, and that listener should be included in the `instance_map`.
The main process also needs an entry on the `instance_map`, and it should be listed under
`main` **if even one other worker exists**. Ensure the port matches with what is declared
inside the `listener` block for a `replication` listener.
When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP
replication listener of the worker, if configured, and to the main process. Each worker
declared under [`stream_writers`](../../workers.md#stream-writers) and
[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP replication listener, and that
listener should be included in the `instance_map`. The main process also needs an entry
on the `instance_map`, and it should be listed under `main` **if even one other worker
exists**. Ensure the port matches with what is declared inside the `listener` block for
a `replication` listener.


Example configuration:
Expand Down Expand Up @@ -3966,6 +3967,22 @@ stream_writers:
typing: worker1
```
---
### `outbound_federation_restricted_to`

When using workers, you can restrict outbound federation traffic to only go through a
specific subset of workers. Any worker specified here must also be in the
[`instance_map`](#instance_map).

```yaml
outbound_federation_restricted_to:
- federation_sender1
- federation_sender2
```

Also see the [worker
documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers)
for more info.
---
### `run_background_tasks_on`

The [worker](../../workers.md#background-tasks) that is used to run
Expand Down
20 changes: 20 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,26 @@ the stream writer for the `presence` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/presence/

#### Restrict outbound federation traffic to a specific set of workers
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

The `outbound_federation_restricted_to` configuration is useful to make sure outbound
federation traffic only goes through a specified subset of workers. This allows you to
set more strict access controls (like a firewall) for all workers and only allow the
`federation_sender`'s to contact the outside world.

```yaml
instance_map:
main:
host: localhost
port: 8030
federation_sender1:
host: localhost
port: 8034

outbound_federation_restricted_to:
- federation_sender1
```

#### Background tasks

There is also support for moving background tasks to a separate
Expand Down
2 changes: 2 additions & 0 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def listen_unix(


def listen_http(
hs: "HomeServer",
listener_config: ListenerConfig,
root_resource: Resource,
version_string: str,
Expand All @@ -406,6 +407,7 @@ def listen_http(
version_string,
max_request_body_size=max_request_body_size,
reactor=reactor,
federation_agent=hs.get_federation_http_client().agent,
)

if isinstance(listener_config, TCPListenerConfig):
Expand Down
1 change: 1 addition & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
root_resource = create_resource_tree(resources, OptionsResource())

_base.listen_http(
self,
listener_config,
root_resource,
self.version_string,
Expand Down
1 change: 1 addition & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _listener_http(
root_resource = OptionsResource()

ports = listen_http(
self,
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
Expand Down
40 changes: 39 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import argparse
import logging
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

import attr
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
Expand Down Expand Up @@ -148,6 +148,27 @@ class WriterLocations:
)


@attr.s(auto_attribs=True)
class OutboundFederationRestrictedTo:
"""Whether we limit outbound federation to a certain set of instances.

Attributes:
instances: optional list of instances that can make outbound federation
requests. If None then all instances can make federation requests.
locations: list of instance locations to connect to proxy via.
"""

instances: Optional[List[str]]
locations: List[InstanceLocationConfig] = attr.Factory(list)

def __contains__(self, instance: str) -> bool:
# It feels a bit dirty to return `True` if `instances` is `None`, but it makes
# sense in downstream usage in the sense that if
# `outbound_federation_restricted_to` is not configured, then any instance can
# talk to federation (no restrictions so always return `True`).
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
return self.instances is None or instance in self.instances


class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
Expand Down Expand Up @@ -357,6 +378,23 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
new_option_name="update_user_directory_from_worker",
)

outbound_federation_restricted_to = config.get(
"outbound_federation_restricted_to", None
)
self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
outbound_federation_restricted_to
)
if outbound_federation_restricted_to:
for instance in outbound_federation_restricted_to:
if instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
% (instance,)
)
self.outbound_federation_restricted_to.locations.append(
self.instance_map[instance]
)

def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
Expand Down
7 changes: 6 additions & 1 deletion synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,12 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
if reason.check(ResponseDone):
self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss):
# stolen from https://github.com/twisted/treq/pull/49/files
# This applies to requests which don't set `Content-Length` or a
# `Transfer-Encoding` in the response because in this case the end of the
# response is indicated by the connection being closed, an event which may
# also be due to a transient network problem or other error. But since this
# behavior is expected of some servers (like YouTube), let's ignore it.
# Stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840
self.deferred.callback(self.length)
else:
Expand Down
132 changes: 122 additions & 10 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer, IResponse
from twisted.web.iweb import IAgent, IBodyProducer, IResponse

import synapse.metrics
import synapse.util.retryutils
Expand All @@ -72,6 +72,7 @@
read_body_with_max_size,
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand Down Expand Up @@ -393,17 +394,32 @@ def __init__(
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)

federation_agent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
outbound_federation_restricted_to = (
hs.config.worker.outbound_federation_restricted_to
)
if hs.get_instance_name() in outbound_federation_restricted_to:
# Talk to federation directly
federation_agent: IAgent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
)
else:
# We need to talk to federation via the proxy via one of the configured
# locations
federation_proxies = outbound_federation_restricted_to.locations
federation_agent = ProxyAgent(
self.reactor,
self.reactor,
tls_client_options_factory,
federation_proxies=federation_proxies,
)

# Use a BlocklistingAgentWrapper to prevent circumventing the IP
# blocking via IP literals in server names
self.agent = BlocklistingAgentWrapper(
self.agent: IAgent = BlocklistingAgentWrapper(
federation_agent,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
)
Expand All @@ -412,7 +428,6 @@ def __init__(
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000

self.max_long_retry_delay_seconds = (
hs.config.federation.max_long_retry_delay_ms / 1000
)
Expand Down Expand Up @@ -1131,6 +1146,101 @@ async def get_json(
Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.

Raises:
HttpResponseException: If we get an HTTP response code >= 300
(except 429).
NotRetryingDestination: If we are not yet ready to retry this
server.
FederationDeniedError: If this destination is not on our
federation whitelist
RequestSendFailed: If there were problems connecting to the
remote, due to e.g. DNS failures, connection timeouts etc.
"""
json_dict, _ = await self.get_json_with_headers(
destination=destination,
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
try_trailing_slash_on_400=try_trailing_slash_on_400,
parser=parser,
)
return json_dict

@overload
async def get_json_with_headers(
self,
destination: str,
path: str,
args: Optional[QueryParams] = None,
retry_on_dns_fail: bool = True,
timeout: Optional[int] = None,
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
...

@overload
async def get_json_with_headers(
self,
destination: str,
path: str,
args: Optional[QueryParams] = ...,
retry_on_dns_fail: bool = ...,
timeout: Optional[int] = ...,
ignore_backoff: bool = ...,
try_trailing_slash_on_400: bool = ...,
parser: ByteParser[T] = ...,
) -> Tuple[T, Dict[bytes, List[bytes]]]:
...

async def get_json_with_headers(
self,
destination: str,
path: str,
args: Optional[QueryParams] = None,
retry_on_dns_fail: bool = True,
timeout: Optional[int] = None,
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None,
) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
"""GETs some json from the given host homeserver and path

Args:
destination: The remote server to send the HTTP request to.

path: The HTTP path.

args: A dictionary used to create query strings, defaults to
None.

retry_on_dns_fail: true if the request should be retried on DNS failures

timeout: number of milliseconds to wait for the response.
self._default_timeout (60s) by default.

Note that we may make several attempts to send the request; this
timeout applies to the time spent waiting for response headers for
*each* attempt (including connection time) as well as the time spent
reading the response body after a 200 response.

ignore_backoff: true to ignore the historical backoff data
and try the request anyway.

try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
response we should try appending a trailing slash to the end of
the request. Workaround for #3622 in Synapse <= v0.99.3.

parser: The parser to use to decode the response. Defaults to
parsing as JSON.

Returns:
Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
decoded JSON body and a dict of the response headers.

Raises:
HttpResponseException: If we get an HTTP response code >= 300
(except 429).
Expand All @@ -1156,6 +1266,8 @@ async def get_json(
timeout=timeout,
)

headers = dict(response.headers.getAllRawHeaders())

if timeout is not None:
_sec_timeout = timeout / 1000
else:
Expand All @@ -1173,7 +1285,7 @@ async def get_json(
parser=parser,
)

return body
return body, headers

async def delete_json(
self,
Expand Down
Loading