diff --git a/conf/broker.conf b/conf/broker.conf
index fd6bba0f45d2cb..5e6b8260ce7fc4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for returning the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
# Number of threads to config Netty Acceptor. Default is 1
numAcceptorThreads=
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5a9d433f39cebe..05b44b2b5a2b3e 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for returning the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 5c94d63817a12f..e064002b455208 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -51,6 +51,12 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for returning the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=
diff --git a/pom.xml b/pom.xml
index 8a43e536cdb03e..85b54d574c8b7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -278,6 +278,7 @@ flexible messaging model and an intuitive client API.
1.5.4
5.4.0
2.33.2
+ 1.0.3
0.6.1
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2b58cbc2d11787..2bfc1b61eae6a1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -250,6 +250,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " when getting topic statistics data.")
private boolean haProxyProtocolEnabled;
+ @FieldContext(category = CATEGORY_SERVER,
+ doc = "Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https "
+ + "requests. Default is false.")
+ private boolean webServiceHaProxyProtocolEnabled = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Trust X-Forwarded-For header for returning the client IP for http/https requests.\n"
+ + "Default is false.")
+ private boolean webServiceTrustXForwardedFor = false;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty Acceptor."
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index e15e024ea81584..35488779121994 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -176,6 +176,13 @@
test
+
+ io.github.hakky54
+ consolecaptor
+ ${consolecaptor.version}
+ test
+
+
io.streamnative.oxia
oxia-testcontainers
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 8dc36e2917ed1e..d65181a016fca6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -31,12 +31,16 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
@@ -103,9 +107,18 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
Optional port = config.getWebServicePort();
HttpConfiguration httpConfig = new HttpConfiguration();
+ if (config.isWebServiceTrustXForwardedFor()) {
+ httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+ }
httpConfig.setRequestHeaderSize(pulsar.getConfig().getHttpMaxRequestHeaderSize());
+ HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (port.isPresent()) {
- httpConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(httpConnectionFactory);
+ httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpConnector.setPort(port.get());
httpConnector.setHost(pulsar.getBindAddress());
connectors.add(httpConnector);
@@ -144,7 +157,13 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
- httpsConnector = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+ connectionFactories.add(httpConnectionFactory);
+ httpsConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
connectors.add(httpsConnector);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
new file mode 100644
index 00000000000000..79f0dcf4c41e22
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import nl.altindag.console.ConsoleCaptor;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class WebServiceOriginalClientIPTest extends MockedPulsarServiceBaseTest {
+ HttpClient httpClient;
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ httpClient = new HttpClient(new SslContextFactory(true));
+ httpClient.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ if (httpClient != null) {
+ httpClient.stop();
+ }
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setWebServiceTrustXForwardedFor(true);
+ conf.setWebServiceHaProxyProtocolEnabled(true);
+ conf.setWebServicePortTls(Optional.of(0));
+ conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
+ conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
+ conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+ }
+
+ @DataProvider(name = "tlsEnabled")
+ public Object[][] tlsEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "tlsEnabled")
+ public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
+ String metricsUrl =
+ (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+ ConsoleCaptor consoleCaptor = new ConsoleCaptor();
+ try {
+ Awaitility.await().untilAsserted(() -> {
+ consoleCaptor.clearOutput();
+
+ // Send a GET request to the metrics URL
+ ContentResponse response = httpClient.newRequest(metricsUrl)
+ .header("X-Forwarded-For", "11.22.33.44")
+ .send();
+
+ // Validate the response
+ assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+ // Validate that the client IP passed in HA Proxy protocol is logged
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("RequestLog") && line.contains("- 11.22.33.44")));
+ });
+ } finally {
+ consoleCaptor.close();
+ }
+ }
+
+
+ @Test(dataProvider = "tlsEnabled")
+ public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
+ String metricsUrl = (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+ ConsoleCaptor consoleCaptor = new ConsoleCaptor();
+ try {
+ Awaitility.await().untilAsserted(() -> {
+ consoleCaptor.clearOutput();
+
+ // Send a GET request to the metrics URL
+ ContentResponse response = httpClient.newRequest(metricsUrl)
+ // Jetty client will add HA Proxy protocol header with the given IP to the request
+ .tag(new V2.Tag("99.22.33.44", 1234))
+ .send();
+
+ // Validate the response
+ assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+ // Validate that the client IP passed in HA Proxy protocol is logged
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("RequestLog") && line.contains("- 99.22.33.44")));
+ });
+ } finally {
+ consoleCaptor.close();
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml
index 38a57df80d57b8..09a89702ee2ac9 100644
--- a/pulsar-broker/src/test/resources/log4j2.xml
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -23,7 +23,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
-
+
+
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 39c8fb5e086fd7..51a8061c3cb633 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -268,6 +268,16 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;
+ @FieldContext(category = CATEGORY_SERVER,
+ doc = "Enable or disable the use of HA proxy protocol for resolving the original client IP of http/https "
+ + "requests. Default is false.")
+ private boolean webServiceHaProxyProtocolEnabled = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Trust X-Forwarded-For header for returning the client IP for http/https requests.\n"
+ + "Default is false.")
+ private boolean webServiceTrustXForwardedFor = false;
+
@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index b95bbcab08b11c..5e076e589a5aa1 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -37,13 +37,17 @@
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -93,12 +97,21 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
List connectors = new ArrayList<>();
HttpConfiguration httpConfig = new HttpConfiguration();
+ if (config.isWebServiceTrustXForwardedFor()) {
+ httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+ }
httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize());
+ HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (config.getWebServicePort().isPresent()) {
this.externalServicePort = config.getWebServicePort().get();
- connector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(httpConnectionFactory);
+ connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connector.setHost(config.getBindAddress());
connector.setPort(externalServicePort);
connectors.add(connector);
@@ -133,7 +146,13 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
- connectorTls = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+ connectionFactories.add(httpConnectionFactory);
+ connectorTls = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connectorTls.setPort(config.getWebServicePortTls().get());
connectorTls.setHost(config.getBindAddress());
connectors.add(connectorTls);