diff --git a/conf/broker.conf b/conf/broker.conf
index fd6bba0f45d2c..d482f77da7cb5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Number of threads to config Netty Acceptor. Default is 1
numAcceptorThreads=
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 3871c74a88778..6f995576ebd64 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,16 @@ workerHostname: localhost
workerPort: 6750
workerPortTls: 6751
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled: false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor: false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses: null
+
# The Configuration metadata store url
# Examples:
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5a9d433f39ceb..6e6c960e8009e 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 5c94d63817a12..b04e5ccefa640 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -51,6 +51,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 490cff2722ee5..9051f3b590c8e 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -46,6 +46,16 @@ statusFilePath=
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0
+# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
# Name of the pulsar cluster to connect to
clusterName=
diff --git a/pom.xml b/pom.xml
index c7fba94abd8ea..d4b14efc356ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -278,6 +278,7 @@ flexible messaging model and an intuitive client API.
1.5.4
5.4.0
2.33.2
+ 1.0.3
0.6.1
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2b58cbc2d1178..156c83bd6960c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -250,6 +250,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " when getting topic statistics data.")
private boolean haProxyProtocolEnabled;
+ @FieldContext(category = CATEGORY_SERVER,
+ doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ + "requests. Default is false.")
+ private boolean webServiceHaProxyProtocolEnabled = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ + "Default is false.")
+ private boolean webServiceTrustXForwardedFor = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+ + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+ + "is enabled.")
+ private Boolean webServiceLogDetailedAddresses;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty Acceptor."
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
index e5daa5852b51f..fc88647eb49ea 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
@@ -18,9 +18,23 @@
*/
package org.apache.pulsar.broker.web;
+import java.net.InetSocketAddress;
import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.RequestLog;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
+import org.eclipse.jetty.util.HostPort;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
/**
* Class to standardize initialization of a Jetty request logger for all pulsar components.
@@ -58,7 +72,184 @@ public class JettyRequestLogFactory {
* Build a new Jetty request logger using the format defined in this class.
* @return a request logger
*/
- public static CustomRequestLog createRequestLogger() {
- return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
+ public static RequestLog createRequestLogger() {
+ return createRequestLogger(false, null);
+ }
+
+ /**
+ * Build a new Jetty request logger using the format defined in this class.
+ * @param showDetailedAddresses whether to show detailed addresses and ports in logs
+ * @return a request logger
+ */
+ public static RequestLog createRequestLogger(boolean showDetailedAddresses, Server server) {
+ if (!showDetailedAddresses) {
+ return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
+ } else {
+ return new OriginalClientIPRequestLog(server);
+ }
+ }
+
+ /**
+ * Logs the original and real remote (client) and local (server) IP addresses
+ * when detailed addresses are enabled.
+ * Tracks the real addresses of remote and local using a registered Connection.Listener
+ * when detailed addresses are enabled.
+ * This is necessary when Proxy Protocol is used to pass the original client IP.
+ */
+ @Slf4j
+ private static class OriginalClientIPRequestLog extends ContainerLifeCycle implements RequestLog {
+ private final ThreadLocal requestLogStringBuilder = ThreadLocal.withInitial(StringBuilder::new);
+ private final CustomRequestLog delegate;
+ private final Slf4jRequestLogWriter delegateLogWriter;
+
+ OriginalClientIPRequestLog(Server server) {
+ delegate = new CustomRequestLog(this::write, LOG_FORMAT);
+ addBean(delegate);
+ delegateLogWriter = new Slf4jRequestLogWriter();
+ addBean(delegateLogWriter);
+ if (server != null) {
+ for (Connector connector : server.getConnectors()) {
+ // adding the listener is only necessary for connectors that use ProxyConnectionFactory
+ if (connector.getDefaultConnectionFactory() instanceof ProxyConnectionFactory) {
+ connector.addBean(proxyProtocolOriginalEndpointListener);
+ }
+ }
+ }
+ }
+
+ void write(String requestEntry) {
+ StringBuilder sb = requestLogStringBuilder.get();
+ sb.setLength(0);
+ sb.append(requestEntry);
+ }
+
+ @Override
+ public void log(Request request, Response response) {
+ delegate.log(request, response);
+ StringBuilder sb = requestLogStringBuilder.get();
+ sb.append(" [R:");
+ sb.append(request.getRemoteHost());
+ sb.append(':');
+ sb.append(request.getRemotePort());
+ InetSocketAddress realRemoteAddress = lookupRealAddress(request.getHttpChannel().getRemoteAddress());
+ if (realRemoteAddress != null) {
+ String realRemoteHost = HostPort.normalizeHost(realRemoteAddress.getHostString());
+ int realRemotePort = realRemoteAddress.getPort();
+ if (!realRemoteHost.equals(request.getRemoteHost()) || realRemotePort != request.getRemotePort()) {
+ sb.append(" via ");
+ sb.append(realRemoteHost);
+ sb.append(':');
+ sb.append(realRemotePort);
+ }
+ }
+ sb.append("]->[L:");
+ InetSocketAddress realLocalAddress = lookupRealAddress(request.getHttpChannel().getLocalAddress());
+ if (realLocalAddress != null) {
+ String realLocalHost = HostPort.normalizeHost(realLocalAddress.getHostString());
+ int realLocalPort = realLocalAddress.getPort();
+ sb.append(realLocalHost);
+ sb.append(':');
+ sb.append(realLocalPort);
+ if (!realLocalHost.equals(request.getLocalAddr()) || realLocalPort != request.getLocalPort()) {
+ sb.append(" dst ");
+ sb.append(request.getLocalAddr());
+ sb.append(':');
+ sb.append(request.getLocalPort());
+ }
+ } else {
+ sb.append(request.getLocalAddr());
+ sb.append(':');
+ sb.append(request.getLocalPort());
+ }
+ sb.append(']');
+ try {
+ delegateLogWriter.write(sb.toString());
+ } catch (Exception e) {
+ log.warn("Failed to write request log", e);
+ }
+ }
+
+ private InetSocketAddress lookupRealAddress(InetSocketAddress socketAddress) {
+ if (socketAddress == null) {
+ return null;
+ }
+ if (proxyProtocolRealAddressMapping.isEmpty()) {
+ return socketAddress;
+ }
+ AddressEntry entry = proxyProtocolRealAddressMapping.get(new AddressKey(socketAddress.getHostString(),
+ socketAddress.getPort()));
+ if (entry != null) {
+ return entry.realAddress;
+ } else {
+ return socketAddress;
+ }
+ }
+
+ private final Connection.Listener proxyProtocolOriginalEndpointListener =
+ new ProxyProtocolOriginalEndpointListener();
+
+ private final ConcurrentHashMap proxyProtocolRealAddressMapping =
+ new ConcurrentHashMap<>();
+
+ // Use a record as key since InetSocketAddress hash code changes if the address gets resolved
+ record AddressKey(String hostString, int port) {
+
+ }
+
+ record AddressEntry(InetSocketAddress realAddress, AtomicInteger referenceCount) {
+
+ }
+
+ // Tracks the real addresses of remote and local when detailed addresses are enabled.
+ // This is necessary when Proxy Protocol is used to pass the original client IP.
+ // The Proxy Protocol implementation in Jetty wraps the original endpoint with a ProxyEndPoint
+ // and the real endpoint information isn't available in the request object.
+ // This listener is added to all connectors to track the real addresses of the client and server.
+ class ProxyProtocolOriginalEndpointListener implements Connection.Listener {
+ @Override
+ public void onOpened(Connection connection) {
+ handleConnection(connection, true);
+ }
+
+ @Override
+ public void onClosed(Connection connection) {
+ handleConnection(connection, false);
+ }
+
+ private void handleConnection(Connection connection, boolean increment) {
+ if (connection.getEndPoint() instanceof ProxyConnectionFactory.ProxyEndPoint) {
+ ProxyConnectionFactory.ProxyEndPoint proxyEndPoint =
+ (ProxyConnectionFactory.ProxyEndPoint) connection.getEndPoint();
+ EndPoint originalEndpoint = proxyEndPoint.unwrap();
+ mapAddress(proxyEndPoint.getLocalAddress(), originalEndpoint.getLocalAddress(), increment);
+ mapAddress(proxyEndPoint.getRemoteAddress(), originalEndpoint.getRemoteAddress(), increment);
+ }
+ }
+
+ private void mapAddress(InetSocketAddress current, InetSocketAddress real, boolean increment) {
+ // don't add the mapping if the current address is the same as the real address
+ if (real != null && current != null && current.equals(real)) {
+ return;
+ }
+ AddressKey key = new AddressKey(current.getHostString(), current.getPort());
+ proxyProtocolRealAddressMapping.compute(key, (__, entry) -> {
+ if (entry == null) {
+ if (increment) {
+ entry = new AddressEntry(real, new AtomicInteger(1));
+ }
+ } else {
+ if (increment) {
+ entry.referenceCount.incrementAndGet();
+ } else {
+ if (entry.referenceCount.decrementAndGet() == 0) {
+ // remove the entry if the reference count drops to 0
+ entry = null;
+ }
+ }
+ }
+ return entry;
+ });
+ }
+ }
}
}
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index e15e024ea8158..3548877912199 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -176,6 +176,13 @@
test
+
+ io.github.hakky54
+ consolecaptor
+ ${consolecaptor.version}
+ test
+
+
io.streamnative.oxia
oxia-testcontainers
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 8dc36e2917ed1..9a439268a8b4f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -31,12 +31,18 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.RequestLog;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
@@ -103,9 +109,18 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
Optional port = config.getWebServicePort();
HttpConfiguration httpConfig = new HttpConfiguration();
+ if (config.isWebServiceTrustXForwardedFor()) {
+ httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+ }
httpConfig.setRequestHeaderSize(pulsar.getConfig().getHttpMaxRequestHeaderSize());
+ HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (port.isPresent()) {
- httpConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(httpConnectionFactory);
+ httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpConnector.setPort(port.get());
httpConnector.setHost(pulsar.getBindAddress());
connectors.add(httpConnector);
@@ -144,7 +159,18 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
- httpsConnector = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+ connectionFactories.add(httpConnectionFactory);
+ // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+ // this is needed for TLS authentication
+ if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ }
+ httpsConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
connectors.add(httpsConnector);
@@ -284,7 +310,12 @@ public void addStaticResources(String basePath, String resourcePath) {
public void start() throws PulsarServerException {
try {
RequestLogHandler requestLogHandler = new RequestLogHandler();
- requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+ boolean showDetailedAddresses = pulsar.getConfiguration().getWebServiceLogDetailedAddresses() != null
+ ? pulsar.getConfiguration().getWebServiceLogDetailedAddresses() :
+ (pulsar.getConfiguration().isWebServiceHaProxyProtocolEnabled()
+ || pulsar.getConfiguration().isWebServiceTrustXForwardedFor());
+ RequestLog requestLogger = JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server);
+ requestLogHandler.setRequestLog(requestLogger);
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
new file mode 100644
index 0000000000000..7f7fa85bd3bb4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import nl.altindag.console.ConsoleCaptor;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.assertj.core.api.ThrowingConsumer;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class WebServiceOriginalClientIPTest extends MockedPulsarServiceBaseTest {
+ HttpClient httpClient;
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ httpClient = new HttpClient(new SslContextFactory(true));
+ httpClient.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ if (httpClient != null) {
+ httpClient.stop();
+ }
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setWebServiceTrustXForwardedFor(true);
+ conf.setWebServiceHaProxyProtocolEnabled(true);
+ conf.setWebServicePortTls(Optional.of(0));
+ conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
+ conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
+ conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+ }
+
+ @DataProvider(name = "tlsEnabled")
+ public Object[][] tlsEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "tlsEnabled")
+ public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
+ String metricsUrl =
+ (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+ performLoggingTest(consoleCaptor -> {
+ // Send a GET request to the metrics URL
+ ContentResponse response = httpClient.newRequest(metricsUrl)
+ .header("X-Forwarded-For", "11.22.33.44:12345")
+ .send();
+
+ // Validate the response
+ assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+ // Validate that the client IP passed in X-Forwarded-For is logged
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("RequestLog") && line.contains("[R:11.22.33.44:12345 via ")));
+ });
+ }
+
+ @Test(dataProvider = "tlsEnabled")
+ public void testClientIPIsPickedFromForwardedHeaderAndLogged(boolean tlsEnabled) throws Exception {
+ String metricsUrl =
+ (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+ performLoggingTest(consoleCaptor -> {
+ // Send a GET request to the metrics URL
+ ContentResponse response = httpClient.newRequest(metricsUrl)
+ .header("Forwarded", "for=11.22.33.44:12345")
+ .send();
+
+ // Validate the response
+ assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+ // Validate that the client IP passed in Forwarded is logged
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("RequestLog") && line.contains("[R:11.22.33.44:12345 via ")));
+ });
+ }
+
+ @Test(dataProvider = "tlsEnabled")
+ public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
+ String metricsUrl = (tlsEnabled ? pulsar.getWebServiceAddressTls() : pulsar.getWebServiceAddress()) + "/metrics/";
+ performLoggingTest(consoleCaptor -> {
+ // Send a GET request to the metrics URL
+ ContentResponse response = httpClient.newRequest(metricsUrl)
+ // Jetty client will add HA Proxy protocol header with the given IP to the request
+ .tag(new V2.Tag(V2.Tag.Command.PROXY, null, V2.Tag.Protocol.STREAM,
+ // source IP and port
+ "99.22.33.44", 1234,
+ // destination IP and port
+ "5.4.3.1", 4321,
+ null))
+ .send();
+
+ // Validate the response
+ assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));
+
+ // Validate that the client IP and destination IP passed in HA Proxy protocol is logged
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("RequestLog") && line.contains("[R:99.22.33.44:1234 via ")
+ && line.contains(" dst 5.4.3.1:4321]")));
+ });
+ }
+
+ void performLoggingTest(ThrowingConsumer testFunction) {
+ ConsoleCaptor consoleCaptor = new ConsoleCaptor();
+ try {
+ Awaitility.await().atMost(Duration.of(2, ChronoUnit.SECONDS)).untilAsserted(() -> {
+ consoleCaptor.clearOutput();
+ testFunction.accept(consoleCaptor);
+ });
+ } finally {
+ consoleCaptor.close();
+ System.out.println("--- Captured console output:");
+ consoleCaptor.getStandardOutput().forEach(System.out::println);
+ consoleCaptor.getErrorOutput().forEach(System.err::println);
+ System.out.println("--- End of captured console output");
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml
index 38a57df80d57b..09a89702ee2ac 100644
--- a/pulsar-broker/src/test/resources/log4j2.xml
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -23,7 +23,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
-
+
+
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index ec0e620d0ae8b..036311ea13230 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -163,6 +163,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
+ "(0 to disable limiting)")
private int maxHttpServerConnections = 2048;
+ @FieldContext(category = CATEGORY_WORKER,
+ doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ + "requests. Default is false.")
+ private boolean webServiceHaProxyProtocolEnabled = false;
+
+ @FieldContext(category = CATEGORY_WORKER, doc =
+ "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ + "Default is false.")
+ private boolean webServiceTrustXForwardedFor = false;
+
+ @FieldContext(category = CATEGORY_WORKER, doc =
+ "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+ + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+ + "is enabled.")
+ private Boolean webServiceLogDetailedAddresses;
+
@FieldContext(
category = CATEGORY_WORKER,
required = false,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 2b3ea30121015..583d8ce558b08 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -35,10 +35,17 @@
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -88,10 +95,21 @@ private void init() {
server.addBean(new ConnectionLimit(workerConfig.getMaxHttpServerConnections(), server));
}
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ if (workerConfig.isWebServiceTrustXForwardedFor()) {
+ httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+ }
+ HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
+
List connectors = new ArrayList<>();
if (this.workerConfig.getWorkerPort() != null) {
log.info("Configuring http server on port={}", this.workerConfig.getWorkerPort());
- httpConnector = new ServerConnector(server);
+ List connectionFactories = new ArrayList<>();
+ if (workerConfig.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(httpConnectionFactory);
+ httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpConnector.setPort(this.workerConfig.getWorkerPort());
connectors.add(httpConnector);
}
@@ -109,7 +127,10 @@ private void init() {
workerConfig.isAuthenticateMetricsEndpoint(), filterInitializer));
RequestLogHandler requestLogHandler = new RequestLogHandler();
- requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+ boolean showDetailedAddresses = workerConfig.getWebServiceLogDetailedAddresses() != null
+ ? workerConfig.getWebServiceLogDetailedAddresses() :
+ (workerConfig.isWebServiceHaProxyProtocolEnabled() || workerConfig.isWebServiceTrustXForwardedFor());
+ requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server));
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
@@ -161,7 +182,18 @@ private void init() {
workerConfig.getTlsCertRefreshCheckDurationSec()
);
}
- httpsConnector = new ServerConnector(server, sslCtxFactory);
+ List connectionFactories = new ArrayList<>();
+ if (workerConfig.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+ connectionFactories.add(httpConnectionFactory);
+ // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+ // this is needed for TLS authentication
+ if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ }
+ httpsConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
httpsConnector.setPort(this.workerConfig.getWorkerPortTls());
connectors.add(httpsConnector);
} catch (Exception e) {
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 64ca301facf4d..a30e23b8d4781 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -209,6 +209,12 @@
${wiremock.version}
test
+
+ io.github.hakky54
+ consolecaptor
+ ${consolecaptor.version}
+ test
+
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 39c8fb5e086fd..d65408748f432 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -268,6 +268,22 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;
+ @FieldContext(category = CATEGORY_SERVER,
+ doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ + "requests. Default is false.")
+ private boolean webServiceHaProxyProtocolEnabled = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ + "Default is false.")
+ private boolean webServiceTrustXForwardedFor = false;
+
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+ + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+ + "is enabled.")
+ private Boolean webServiceLogDetailedAddresses;
+
@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 50a8e3ab7d753..10121e7f5d61d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
@@ -109,8 +110,15 @@ public class ProxyServiceStarter {
private WebServer server;
private WebSocketService webSocketService;
private static boolean metricsInitialized;
+ private boolean embeddedMode;
public ProxyServiceStarter(String[] args) throws Exception {
+ this(args, null, false);
+ }
+
+ public ProxyServiceStarter(String[] args, Consumer proxyConfigurationCustomizer,
+ boolean embeddedMode) throws Exception {
+ this.embeddedMode = embeddedMode;
try {
DateFormat dateFormat = new SimpleDateFormat(
FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
@@ -132,15 +140,26 @@ public ProxyServiceStarter(String[] args) throws Exception {
CmdGenerateDocs cmd = new CmdGenerateDocs("pulsar");
cmd.addCommand("proxy", commander);
cmd.run(null);
- System.exit(0);
+ if (embeddedMode) {
+ return;
+ } else {
+ System.exit(0);
+ }
}
} catch (Exception e) {
commander.getErr().println(e);
- System.exit(1);
+ if (embeddedMode) {
+ return;
+ } else {
+ System.exit(1);
+ }
}
// load config file
config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
+ if (proxyConfigurationCustomizer != null) {
+ proxyConfigurationCustomizer.accept(config);
+ }
if (!isBlank(zookeeperServers)) {
// Use zookeeperServers from command line
@@ -230,7 +249,9 @@ public void start() throws Exception {
// create a web-service
server = new WebServer(config, authenticationService);
- Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+ if (!embeddedMode) {
+ Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+ }
proxyService.start();
@@ -293,7 +314,9 @@ public void close() {
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
- LogManager.shutdown();
+ if (!embeddedMode) {
+ LogManager.shutdown();
+ }
}
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index b95bbcab08b11..478b911eb23cf 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -37,13 +37,18 @@
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -93,12 +98,21 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
List connectors = new ArrayList<>();
HttpConfiguration httpConfig = new HttpConfiguration();
+ if (config.isWebServiceTrustXForwardedFor()) {
+ httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+ }
httpConfig.setOutputBufferSize(config.getHttpOutputBufferSize());
httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize());
+ HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
if (config.getWebServicePort().isPresent()) {
this.externalServicePort = config.getWebServicePort().get();
- connector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(httpConnectionFactory);
+ connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connector.setHost(config.getBindAddress());
connector.setPort(externalServicePort);
connectors.add(connector);
@@ -133,7 +147,18 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
- connectorTls = new ServerConnector(server, sslCtxFactory, new HttpConnectionFactory(httpConfig));
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+ connectionFactories.add(httpConnectionFactory);
+ // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+ // this is needed for TLS authentication
+ if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ }
+ connectorTls = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connectorTls.setPort(config.getWebServicePortTls().get());
connectorTls.setHost(config.getBindAddress());
connectors.add(connectorTls);
@@ -281,7 +306,10 @@ public int getExternalServicePort() {
public void start() throws Exception {
RequestLogHandler requestLogHandler = new RequestLogHandler();
- requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+ boolean showDetailedAddresses = config.getWebServiceLogDetailedAddresses() != null
+ ? config.getWebServiceLogDetailedAddresses() :
+ (config.isWebServiceHaProxyProtocolEnabled() || config.isWebServiceTrustXForwardedFor());
+ requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server));
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
new file mode 100644
index 0000000000000..b267439d47113
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import nl.altindag.console.ConsoleCaptor;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.assertj.core.api.ThrowingConsumer;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ProxyOriginalClientIPTest extends MockedPulsarServiceBaseTest {
+ static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
+ HttpClient httpClient;
+ ProxyServiceStarter serviceStarter;
+ String webServiceUrl;
+ String webServiceUrlTls;
+
+ @Override
+ @BeforeClass
+ protected void setup() throws Exception {
+ internalSetup();
+ serviceStarter = new ProxyServiceStarter(ARGS, proxyConfig -> {
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setWebServicePortTls(Optional.of(0));
+ proxyConfig.setTlsEnabledWithBroker(false);
+ proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
+ proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setWebSocketServiceEnabled(true);
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setClusterName(configClusterName);
+ proxyConfig.setWebServiceTrustXForwardedFor(true);
+ proxyConfig.setWebServiceHaProxyProtocolEnabled(true);
+ }, true);
+ serviceStarter.start();
+ webServiceUrl = "http://localhost:" + serviceStarter.getServer().getListenPortHTTP().get();
+ webServiceUrlTls = "https://localhost:" + serviceStarter.getServer().getListenPortHTTPS().get();
+ httpClient = new HttpClient(new SslContextFactory(true));
+ httpClient.start();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ if (serviceStarter != null) {
+ serviceStarter.close();
+ }
+ if (httpClient != null) {
+ httpClient.stop();
+ }
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setWebServiceTrustXForwardedFor(true);
+ }
+
+ @DataProvider(name = "tlsEnabled")
+ public Object[][] tlsEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "tlsEnabled")
+ public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
+ String url = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/admin/v2/brokers/leaderBroker";
+ performLoggingTest(consoleCaptor -> {
+ // Send a GET request to the metrics URL
+ ContentResponse response = httpClient.newRequest(url)
+ .header("X-Forwarded-For", "11.22.33.44")
+ .send();
+
+ // Validate the response
+ assertTrue(response.getContentAsString().contains("\"brokerId\":\"" + pulsar.getBrokerId() + "\""));
+
+ // Validate that the client IP passed in X-Forwarded-For is logged
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("pulsar-external-web-") && line.contains("RequestLog")
+ && line.contains("R:11.22.33.44")), "Expected to find client IP in proxy logs");
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("pulsar-web-") && line.contains("RequestLog")
+ && line.contains("R:11.22.33.44")), "Expected to find client IP in broker logs");
+ });
+ }
+
+ @Test(dataProvider = "tlsEnabled")
+ public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
+ String url = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/admin/v2/brokers/leaderBroker";
+ performLoggingTest(consoleCaptor -> {
+ // Send a GET request to the metrics URL
+ ContentResponse response = httpClient.newRequest(url)
+ // Jetty client will add HA Proxy protocol header with the given IP to the request
+ .tag(new V2.Tag("99.22.33.44", 1234))
+ .send();
+
+ // Validate the response
+ assertTrue(response.getContentAsString().contains("\"brokerId\":\"" + pulsar.getBrokerId() + "\""));
+
+ // Validate that the client IP passed in HA proxy protocol is logged
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("pulsar-external-web-") && line.contains("RequestLog")
+ && line.contains("R:99.22.33.44")), "Expected to find client IP in proxy logs");
+ assertTrue(consoleCaptor.getStandardOutput().stream()
+ .anyMatch(line -> line.contains("pulsar-web-") && line.contains("RequestLog")
+ && line.contains("R:99.22.33.44")), "Expected to find client IP in broker logs");
+ });
+ }
+
+ void performLoggingTest(ThrowingConsumer testFunction) {
+ ConsoleCaptor consoleCaptor = new ConsoleCaptor();
+ try {
+ Awaitility.await().atMost(Duration.of(2, ChronoUnit.SECONDS)).untilAsserted(() -> {
+ consoleCaptor.clearOutput();
+ testFunction.accept(consoleCaptor);
+ });
+ } finally {
+ consoleCaptor.close();
+ System.out.println("--- Captured console output:");
+ consoleCaptor.getStandardOutput().forEach(System.out::println);
+ consoleCaptor.getErrorOutput().forEach(System.err::println);
+ System.out.println("--- End of captured console output");
+ }
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
index 3e598a57277a2..937526629acf0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
@@ -27,7 +27,7 @@ public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterT
@BeforeClass
protected void setup() throws Exception {
internalSetup();
- serviceStarter = new ProxyServiceStarter(ARGS);
+ serviceStarter = new ProxyServiceStarter(ARGS, null, true);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setWebServicePort(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index f263286125353..0b9b6f17d1254 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -54,7 +54,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
@BeforeClass
protected void setup() throws Exception {
internalSetup();
- serviceStarter = new ProxyServiceStarter(ARGS);
+ serviceStarter = new ProxyServiceStarter(ARGS, null, true);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setWebServicePort(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 61718bbac3ab0..770424d93747c 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -55,7 +55,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
@BeforeClass
protected void setup() throws Exception {
internalSetup();
- serviceStarter = new ProxyServiceStarter(ARGS);
+ serviceStarter = new ProxyServiceStarter(ARGS, null, true);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
diff --git a/pulsar-proxy/src/test/resources/log4j2.xml b/pulsar-proxy/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000000..261bd2edf6980
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/log4j2.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 7aed43d056c67..bbb34a3e3f73d 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -35,10 +35,17 @@
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
+import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.ProxyConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
@@ -73,10 +80,22 @@ public ProxyServer(WebSocketProxyConfiguration config)
if (config.getMaxHttpServerConnections() > 0) {
server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server));
}
+
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ if (config.isWebServiceTrustXForwardedFor()) {
+ httpConfig.addCustomizer(new ForwardedRequestCustomizer());
+ }
+ HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig);
+
List connectors = new ArrayList<>();
if (config.getWebServicePort().isPresent()) {
- connector = new ServerConnector(server);
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(httpConnectionFactory);
+ connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connector.setPort(config.getWebServicePort().get());
connectors.add(connector);
}
@@ -111,7 +130,18 @@ public ProxyServer(WebSocketProxyConfiguration config)
config.getWebServiceTlsProtocols(),
config.getTlsCertRefreshCheckDurationSec());
}
- connectorTls = new ServerConnector(server, sslCtxFactory);
+ List connectionFactories = new ArrayList<>();
+ if (config.isWebServiceHaProxyProtocolEnabled()) {
+ connectionFactories.add(new ProxyConnectionFactory());
+ }
+ connectionFactories.add(new SslConnectionFactory(sslCtxFactory, httpConnectionFactory.getProtocol()));
+ connectionFactories.add(httpConnectionFactory);
+ // org.eclipse.jetty.server.AbstractConnectionFactory.getFactories contains similar logic
+ // this is needed for TLS authentication
+ if (httpConfig.getCustomizer(SecureRequestCustomizer.class) == null) {
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ }
+ connectorTls = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connectorTls.setPort(config.getWebServicePortTls().get());
connectors.add(connectorTls);
} catch (Exception e) {
@@ -169,7 +199,10 @@ public void start() throws PulsarServerException {
.map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
.collect(Collectors.joining(",")));
RequestLogHandler requestLogHandler = new RequestLogHandler();
- requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+ boolean showDetailedAddresses = conf.getWebServiceLogDetailedAddresses() != null
+ ? conf.getWebServiceLogDetailedAddresses() :
+ (conf.isWebServiceHaProxyProtocolEnabled() || conf.isWebServiceTrustXForwardedFor());
+ requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger(showDetailedAddresses, server));
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 7acfd4a64ad35..3fcbcf4b21567 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -96,6 +96,20 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
@FieldContext(doc = "Hostname or IP address the service binds on, default is 0.0.0.0.")
private String bindAddress = "0.0.0.0";
+ @FieldContext(doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ + "requests. Default is false.")
+ private boolean webServiceHaProxyProtocolEnabled = false;
+
+ @FieldContext(doc = "Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ + "Default is false.")
+ private boolean webServiceTrustXForwardedFor = false;
+
+ @FieldContext(doc =
+ "Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+ + "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+ + "is enabled.")
+ private Boolean webServiceLogDetailedAddresses;
+
@FieldContext(doc = "Maximum size of a text message during parsing in WebSocket proxy")
private int webSocketMaxTextFrameSize = 1024 * 1024;
// --- Authentication ---