Skip to content

Commit

Permalink
[improve][broker] Support X-Forwarded-For and HA Proxy protocol for r…
Browse files Browse the repository at this point in the history
…esolving the client IP of http/https requests
  • Loading branch information
lhotari committed Apr 18, 2024
1 parent 8ca01cd commit 0292dcd
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 5 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for returning the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Number of threads to config Netty Acceptor. Default is 1
numAcceptorThreads=

Expand Down
6 changes: 6 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for returning the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for returning the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ flexible messaging model and an intuitive client API.</description>
<jettison.version>1.5.4</jettison.version>
<woodstox.version>5.4.0</woodstox.version>
<wiremock.version>2.33.2</wiremock.version>
<consolecaptor.version>1.0.3</consolecaptor.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " when getting topic statistics data.")
private boolean haProxyProtocolEnabled;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https "
+ "requests. Default is false.")
private boolean webServiceHaProxyProtocolEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Trust X-Forwarded-For header for returning the client IP for http/https requests.\n"
+ "Default is false.")
private boolean webServiceTrustXForwardedFor = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty Acceptor."
Expand Down
7 changes: 7 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>consolecaptor</artifactId>
<version>${consolecaptor.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
Expand Down Expand Up @@ -103,9 +107,18 @@ public WebService(PulsarService pulsar) throws PulsarServerException {

Optional<Integer> port = config.getWebServicePort();
HttpConfiguration httpConfig = new HttpConfiguration();
if (config.isWebServiceTrustXForwardedFor()) {
httpConfig.addCustomizer(new ForwardedRequestCustomizer());
}
httpConfig.setRequestHeaderSize(pulsar.getConfig().getHttpMaxRequestHeaderSize());
HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (port.isPresent()) {
httpConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(httpConnectionFactory);
httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpConnector.setPort(port.get());
httpConnector.setHost(pulsar.getBindAddress());
connectors.add(httpConnector);
Expand Down Expand Up @@ -144,7 +157,13 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
httpsConnector = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
connectionFactories.add(httpConnectionFactory);
httpsConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
connectors.add(httpsConnector);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import static org.testng.Assert.assertTrue;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import nl.altindag.console.ConsoleCaptor;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.awaitility.Awaitility;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class WebServiceOriginalClientIPTest extends MockedPulsarServiceBaseTest {
HttpClient httpClient;

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
httpClient = new HttpClient(new SslContextFactory(true));
httpClient.start();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
if (httpClient != null) {
httpClient.stop();
}
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setWebServiceTrustXForwardedFor(true);
conf.setWebServiceHaProxyProtocolEnabled(true);
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
}

@DataProvider(name = "tlsEnabled")
public Object[][] tlsEnabled() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "tlsEnabled")
public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
String metricsUrl =
(tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
ConsoleCaptor consoleCaptor = new ConsoleCaptor();
try {
Awaitility.await().untilAsserted(() -> {
consoleCaptor.clearOutput();

// Send a GET request to the metrics URL
ContentResponse response = httpClient.newRequest(metricsUrl)
.header("X-Forwarded-For", "11.22.33.44")
.send();

// Validate the response
assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));

// Validate that the client IP passed in HA Proxy protocol is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
.anyMatch(line -> line.contains("RequestLog") && line.contains("- 11.22.33.44")));
});
} finally {
consoleCaptor.close();
}
}


@Test(dataProvider = "tlsEnabled")
public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
String metricsUrl = (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
ConsoleCaptor consoleCaptor = new ConsoleCaptor();
try {
Awaitility.await().untilAsserted(() -> {
consoleCaptor.clearOutput();

// Send a GET request to the metrics URL
ContentResponse response = httpClient.newRequest(metricsUrl)
// Jetty client will add HA Proxy protocol header with the given IP to the request
.tag(new V2.Tag("99.22.33.44", 1234))
.send();

// Validate the response
assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));

// Validate that the client IP passed in HA Proxy protocol is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
.anyMatch(line -> line.contains("RequestLog") && line.contains("- 99.22.33.44")));
});
} finally {
consoleCaptor.close();
}
}
}
3 changes: 2 additions & 1 deletion pulsar-broker/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<!-- setting follow="true" is required for using ConsoleCaptor to validate log messages -->
<Console name="CONSOLE" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/>
</Console>
</Appenders>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https "
+ "requests. Default is false.")
private boolean webServiceHaProxyProtocolEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Trust X-Forwarded-For header for returning the client IP for http/https requests.\n"
+ "Default is false.")
private boolean webServiceTrustXForwardedFor = false;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
Expand Down Expand Up @@ -93,12 +97,21 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
List<ServerConnector> connectors = new ArrayList<>();

HttpConfiguration httpConfig = new HttpConfiguration();
if (config.isWebServiceTrustXForwardedFor()) {
httpConfig.addCustomizer(new ForwardedRequestCustomizer());
}
httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize());

HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (config.getWebServicePort().isPresent()) {
this.externalServicePort = config.getWebServicePort().get();
connector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(httpConnectionFactory);
connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connector.setHost(config.getBindAddress());
connector.setPort(externalServicePort);
connectors.add(connector);
Expand Down Expand Up @@ -133,7 +146,13 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
connectorTls = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
connectionFactories.add(httpConnectionFactory);
connectorTls = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connectorTls.setPort(config.getWebServicePortTls().get());
connectorTls.setHost(config.getBindAddress());
connectors.add(connectorTls);
Expand Down

0 comments on commit 0292dcd

Please sign in to comment.