Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests #22524

Merged
merged 6 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Number of threads to config Netty Acceptor. Default is 1
numAcceptorThreads=

Expand Down
6 changes: 6 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ flexible messaging model and an intuitive client API.</description>
<jettison.version>1.5.4</jettison.version>
<woodstox.version>5.4.0</woodstox.version>
<wiremock.version>2.33.2</wiremock.version>
<consolecaptor.version>1.0.3</consolecaptor.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " when getting topic statistics data.")
private boolean haProxyProtocolEnabled;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ "requests. Default is false.")
private boolean webServiceHaProxyProtocolEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ "Default is false.")
private boolean webServiceTrustXForwardedFor = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty Acceptor."
Expand Down
7 changes: 7 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>consolecaptor</artifactId>
<version>${consolecaptor.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
Expand Down Expand Up @@ -103,9 +108,18 @@ public WebService(PulsarService pulsar) throws PulsarServerException {

Optional<Integer> port = config.getWebServicePort();
HttpConfiguration httpConfig = new HttpConfiguration();
if (config.isWebServiceTrustXForwardedFor()) {
httpConfig.addCustomizer(new ForwardedRequestCustomizer());
}
httpConfig.setRequestHeaderSize(pulsar.getConfig().getHttpMaxRequestHeaderSize());
HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (port.isPresent()) {
httpConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(httpConnectionFactory);
httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpConnector.setPort(port.get());
httpConnector.setHost(pulsar.getBindAddress());
connectors.add(httpConnector);
Expand Down Expand Up @@ -144,7 +158,18 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
httpsConnector = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
connectionFactories.add(httpConnectionFactory);
// org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
// this is needed for TLS authentication
if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
httpConfig.addCustomizer(new SecureRequestCustomizer());
}
httpsConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
connectors.add(httpsConnector);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import static org.testng.Assert.assertTrue;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import nl.altindag.console.ConsoleCaptor;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.awaitility.Awaitility;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class WebServiceOriginalClientIPTest extends MockedPulsarServiceBaseTest {
HttpClient httpClient;

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
httpClient = new HttpClient(new SslContextFactory(true));
httpClient.start();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
if (httpClient != null) {
httpClient.stop();
}
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setWebServiceTrustXForwardedFor(true);
conf.setWebServiceHaProxyProtocolEnabled(true);
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
}

@DataProvider(name = "tlsEnabled")
public Object[][] tlsEnabled() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "tlsEnabled")
public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
String metricsUrl =
(tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
ConsoleCaptor consoleCaptor = new ConsoleCaptor();
try {
Awaitility.await().untilAsserted(() -> {
consoleCaptor.clearOutput();

// Send a GET request to the metrics URL
ContentResponse response = httpClient.newRequest(metricsUrl)
.header("X-Forwarded-For", "11.22.33.44")
.send();

// Validate the response
assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));

// Validate that the client IP passed in HA Proxy protocol is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
.anyMatch(line -> line.contains("RequestLog") && line.contains("- 11.22.33.44")));
});
} finally {
consoleCaptor.close();
}
}


@Test(dataProvider = "tlsEnabled")
public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
String metricsUrl = (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
ConsoleCaptor consoleCaptor = new ConsoleCaptor();
try {
Awaitility.await().untilAsserted(() -> {
consoleCaptor.clearOutput();

// Send a GET request to the metrics URL
ContentResponse response = httpClient.newRequest(metricsUrl)
// Jetty client will add HA Proxy protocol header with the given IP to the request
.tag(new V2.Tag("99.22.33.44", 1234))
.send();

// Validate the response
assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));

// Validate that the client IP passed in HA Proxy protocol is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
.anyMatch(line -> line.contains("RequestLog") && line.contains("- 99.22.33.44")));
});
} finally {
consoleCaptor.close();
}
}
}
3 changes: 2 additions & 1 deletion pulsar-broker/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<!-- setting follow="true" is required for using ConsoleCaptor to validate log messages -->
<Console name="CONSOLE" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/>
</Console>
</Appenders>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ "requests. Default is false.")
private boolean webServiceHaProxyProtocolEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ "Default is false.")
private boolean webServiceTrustXForwardedFor = false;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
Expand Down Expand Up @@ -93,12 +98,21 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
List<ServerConnector> connectors = new ArrayList<>();

HttpConfiguration httpConfig = new HttpConfiguration();
if (config.isWebServiceTrustXForwardedFor()) {
httpConfig.addCustomizer(new ForwardedRequestCustomizer());
}
httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize());

HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (config.getWebServicePort().isPresent()) {
this.externalServicePort = config.getWebServicePort().get();
connector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(httpConnectionFactory);
connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connector.setHost(config.getBindAddress());
connector.setPort(externalServicePort);
connectors.add(connector);
Expand Down Expand Up @@ -133,7 +147,18 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
connectorTls = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
List<ConnectionFactory> connectionFactories = new ArrayList<>();
if (config.isWebServiceHaProxyProtocolEnabled()) {
connectionFactories.add(new ProxyConnectionFactory());
}
connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
connectionFactories.add(httpConnectionFactory);
// org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
// this is needed for TLS authentication
if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
httpConfig.addCustomizer(new SecureRequestCustomizer());
}
connectorTls = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connectorTls.setPort(config.getWebServicePortTls().get());
connectorTls.setHost(config.getBindAddress());
connectors.add(connectorTls);
Expand Down
Loading