Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

support federation queries through http connect proxy #9306

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9306.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for sending federation requests through a proxy. Contributed by @Bubu.
68 changes: 64 additions & 4 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
import urllib.parse
from typing import Any, Generator, List, Optional
from urllib.request import getproxies, proxy_bypass

from netaddr import AddrFormatError, IPAddress, IPSet
from zope.interface import implementer
Expand All @@ -30,9 +31,12 @@
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer

from synapse.crypto.context_factory import FederationPolicyForHTTPS
from synapse.http import proxyagent
from synapse.http.client import BlacklistingAgentWrapper
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint
from synapse.http.federation.srv_resolver import Server, SrvResolver
from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.http.proxyagent import ProxyAgent, parse_username_password
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util import Clock
Expand Down Expand Up @@ -72,6 +76,7 @@ def __init__(
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
ip_blacklist: IPSet,
proxy_reactor: Optional[ISynapseReactor] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add new params to the docstring (on the class, in this case). As well as making the code maintainable in future, it's important to help with review.

_srv_resolver: Optional[SrvResolver] = None,
_well_known_resolver: Optional[WellKnownResolver] = None,
):
Expand All @@ -82,10 +87,22 @@ def __init__(
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60

if proxy_reactor is None:
self.proxy_reactor = reactor
else:
self.proxy_reactor = proxy_reactor

proxies = getproxies()
https_proxy = proxies["https"].encode() if "https" in proxies else None

self._agent = Agent.usingEndpointFactory(
self._reactor,
MatrixHostnameEndpointFactory(
reactor, tls_client_options_factory, _srv_resolver
reactor,
self.proxy_reactor,
tls_client_options_factory,
_srv_resolver,
https_proxy,
),
pool=self._pool,
)
Expand All @@ -97,10 +114,12 @@ def __init__(
_well_known_resolver = WellKnownResolver(
self._reactor,
agent=BlacklistingAgentWrapper(
Agent(
ProxyAgent(
self._reactor,
self.proxy_reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
use_proxy=True,
),
ip_blacklist=ip_blacklist,
),
Expand Down Expand Up @@ -200,8 +219,10 @@ class MatrixHostnameEndpointFactory:
def __init__(
self,
reactor: IReactorCore,
proxy_reactor: IReactorCore,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
srv_resolver: Optional[SrvResolver],
https_proxy: Optional[bytes] = None,
):
self._reactor = reactor
self._tls_client_options_factory = tls_client_options_factory
Expand All @@ -210,13 +231,20 @@ def __init__(
srv_resolver = SrvResolver()

self._srv_resolver = srv_resolver
self.https_proxy_creds, https_proxy = parse_username_password(https_proxy)

self.https_proxy_endpoint = proxyagent.http_proxy_endpoint(
https_proxy, proxy_reactor
)
Bubu marked this conversation as resolved.
Show resolved Hide resolved

def endpointForURI(self, parsed_uri):
return MatrixHostnameEndpoint(
self._reactor,
self._tls_client_options_factory,
self._srv_resolver,
parsed_uri,
self.https_proxy_endpoint,
self.https_proxy_creds,
)


Expand All @@ -239,11 +267,17 @@ def __init__(
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
srv_resolver: SrvResolver,
parsed_uri: URI,
https_proxy_endpoint: Optional[IStreamClientEndpoint],
https_proxy_creds,
):
self._reactor = reactor

self._parsed_uri = parsed_uri

self.https_proxy_endpoint = https_proxy_endpoint

self.https_proxy_creds = https_proxy_creds

# set up the TLS connection params
#
# XXX disabling TLS is really only supported here for the benefit of the
Expand Down Expand Up @@ -273,9 +307,35 @@ async def _do_connect(self, protocol_factory: IProtocolFactory) -> None:
host = server.host
port = server.port

endpoint: IStreamClientEndpoint
try:
logger.debug("Connecting to %s:%i", host.decode("ascii"), port)
endpoint = HostnameEndpoint(self._reactor, host, port)
if self.https_proxy_endpoint and not proxy_bypass(host.decode()):
logger.debug(
"Connecting to %s:%i via %s",
host.decode("ascii"),
port,
self.https_proxy_endpoint,
)
connect_headers = Headers()
# Determine whether we need to set Proxy-Authorization headers
if self.https_proxy_creds:
# Set a Proxy-Authorization header
connect_headers.addRawHeader(
b"Proxy-Authorization",
self.https_proxy_creds.as_proxy_authorization_value(),
)

endpoint = HTTPConnectProxyEndpoint(
self._reactor,
self.https_proxy_endpoint,
host,
port,
headers=connect_headers,
)
else:
logger.debug("Connecting to %s:%i", host.decode("ascii"), port)
# not using a proxy
endpoint = HostnameEndpoint(self._reactor, host, port)
if self._tls_options:
endpoint = wrapClientTLS(self._tls_options, endpoint)
result = await make_deferred_yieldable(
Expand Down
1 change: 1 addition & 0 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def __init__(self, hs, tls_client_options_factory):
tls_client_options_factory,
user_agent,
hs.config.federation_ip_range_blacklist,
proxy_reactor=hs.get_reactor(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we already pass the ip blacklist into MatrixFederationAgent, why not move the construction of BlacklistingReactorWrapper down to it, rather than having to pass in two reactors?

)

# Use a BlacklistingAgentWrapper to prevent circumventing the IP
Expand Down
10 changes: 6 additions & 4 deletions synapse/http/proxyagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ def __init__(
# Parse credentials from https proxy connection string if present
self.https_proxy_creds, https_proxy = parse_username_password(https_proxy)

self.http_proxy_endpoint = _http_proxy_endpoint(
self.http_proxy_endpoint = http_proxy_endpoint(
http_proxy, self.proxy_reactor, **self._endpoint_kwargs
)

self.https_proxy_endpoint = _http_proxy_endpoint(
self.https_proxy_endpoint = http_proxy_endpoint(
https_proxy, self.proxy_reactor, **self._endpoint_kwargs
)

Expand Down Expand Up @@ -243,7 +243,7 @@ def request(self, method, uri, headers=None, bodyProducer=None):
)


def _http_proxy_endpoint(proxy: Optional[bytes], reactor, **kwargs):
def http_proxy_endpoint(proxy: Optional[bytes], reactor, **kwargs):
"""Parses an http proxy setting and returns an endpoint for the proxy

Args:
Expand All @@ -267,7 +267,9 @@ def _http_proxy_endpoint(proxy: Optional[bytes], reactor, **kwargs):
return HostnameEndpoint(reactor, host, port, **kwargs)


def parse_username_password(proxy: bytes) -> Tuple[Optional[ProxyCredentials], bytes]:
def parse_username_password(
proxy: Optional[bytes],
) -> Tuple[Optional[ProxyCredentials], bytes]:
"""
Parses the username and password from a proxy declaration e.g
username:password@hostname:port.
Expand Down
105 changes: 86 additions & 19 deletions tests/http/federation/test_matrix_federation_agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# -*- coding: utf-8 -*-

This is no longer needed. See #9786

# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from unittest.mock import Mock
import os
from unittest.mock import patch

import treq
from mock import Mock
from netaddr import IPSet
from service_identity import VerificationError
from zope.interface import implementer
Expand Down Expand Up @@ -109,7 +111,7 @@ def setUp(self):
_well_known_resolver=self.well_known_resolver,
)

def _make_connection(self, client_factory, expected_sni):
def _make_connection(self, client_factory, expected_sni=None):
"""Builds a test server, and completes the outgoing client connection

Returns:
Expand Down Expand Up @@ -146,12 +148,13 @@ def _make_connection(self, client_factory, expected_sni):
self.reactor.pump((0.1,))

# check the SNI
server_name = server_tls_connection.get_servername()
self.assertEqual(
server_name,
expected_sni,
"Expected SNI %s but got %s" % (expected_sni, server_name),
)
if expected_sni is not None:
server_name = server_tls_connection.get_servername()
self.assertEqual(
server_name,
expected_sni,
"Expected SNI %s but got %s" % (expected_sni, server_name),
)

return http_protocol

Expand Down Expand Up @@ -179,11 +182,7 @@ def _make_get_request(self, uri):
_check_logcontext(context)

def _handle_well_known_connection(
self,
client_factory,
expected_sni,
content,
response_headers: Optional[dict] = None,
self, client_factory, expected_sni, content, response_headers={}
):
"""Handle an outgoing HTTPs connection: wire it up to a server, check that the
request is for a .well-known, and send the response.
Expand All @@ -205,20 +204,18 @@ def _handle_well_known_connection(
self.assertEqual(
request.requestHeaders.getRawHeaders(b"user-agent"), [b"test-agent"]
)
self._send_well_known_response(request, content, headers=response_headers or {})
self._send_well_known_response(request, content, headers=response_headers)
return well_known_server

def _send_well_known_response(
self, request, content, headers: Optional[dict] = None
):
def _send_well_known_response(self, request, content, headers={}):
"""Check that an incoming request looks like a valid .well-known request, and
send back the response.
"""
self.assertEqual(request.method, b"GET")
self.assertEqual(request.path, b"/.well-known/matrix/server")
self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"])
# send back a response
for k, v in (headers or {}).items():
for k, v in headers.items():
request.setHeader(k, v)
request.write(content)
request.finish()
Expand Down Expand Up @@ -282,6 +279,76 @@ def test_get(self):
json = self.successResultOf(treq.json_content(response))
self.assertEqual(json, {"a": 1})

@patch.dict(os.environ, {"https_proxy": "proxy.com", "no_proxy": "unused.com"})
def test_get_via_proxy(self):
"""
test for federation request through proxy
"""
# recreate the agent with patched env
self.agent = MatrixFederationAgent(
reactor=self.reactor,
tls_client_options_factory=self.tls_factory,
user_agent="test-agent", # Note that this is unused since _well_known_resolver is provided.
ip_blacklist=IPSet(),
_srv_resolver=self.mock_resolver,
_well_known_resolver=self.well_known_resolver,
)

self.reactor.lookups["testserv"] = "1.2.3.4"
self.reactor.lookups["proxy.com"] = "9.9.9.9"
test_d = self._make_get_request(b"matrix://testserv:8448/foo/bar")

# Nothing happened yet
self.assertNoResult(test_d)

# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients[0]
# make sure we are connecting to the proxy
self.assertEqual(host, "9.9.9.9")
self.assertEqual(port, 1080)

# make a test server, and wire up the client
http_server = self._make_connection(client_factory)

self.assertEqual(len(http_server.requests), 1)
request = http_server.requests[0]
self.assertEqual(request.method, b"GET")
self.assertEqual(request.path, b"/foo/bar")
self.assertEqual(
request.requestHeaders.getRawHeaders(b"host"), [b"testserv:8448"]
)
self.assertEqual(
request.requestHeaders.getRawHeaders(b"user-agent"), [b"test-agent"]
)
content = request.content.read()
self.assertEqual(content, b"")

# Deferred is still without a result
self.assertNoResult(test_d)

# send the headers
request.responseHeaders.setRawHeaders(b"Content-Type", [b"application/json"])
request.write("")

self.reactor.pump((0.1,))

response = self.successResultOf(test_d)

# that should give us a Response object
self.assertEqual(response.code, 200)

# Send the body
request.write('{ "a": 1 }'.encode("ascii"))
request.finish()

self.reactor.pump((0.1,))

# check it can be read
json = self.successResultOf(treq.json_content(response))
self.assertEqual(json, {"a": 1})

def test_get_ip_address(self):
"""
Test the behaviour when the server name contains an explicit IP (with no port)
Expand Down