Skip to content

Commit

Permalink
[improve][broker] Support X-Forwarded-For and HA Proxy Protocol for r…
Browse files Browse the repository at this point in the history
…esolving original client IP of http/https requests (apache#22524)
  • Loading branch information
lhotari authored Apr 22, 2024
1 parent 21647a1 commit 4a88721
Show file tree
Hide file tree
Showing 25 changed files with 835 additions and 22 deletions.
10 changes: 10 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Number of threads to config Netty Acceptor. Default is 1
numAcceptorThreads=

Expand Down
10 changes: 10 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ workerHostname: localhost
workerPort: 6750
workerPortTls: 6751

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled: false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor: false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses: null

# The Configuration metadata store url
# Examples:
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
Expand Down
10 changes: 10 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true
Expand Down
10 changes: 10 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

Expand Down
10 changes: 10 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ statusFilePath=
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Name of the pulsar cluster to connect to
clusterName=

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ flexible messaging model and an intuitive client API.</description>
<jettison.version>1.5.4</jettison.version>
<woodstox.version>5.4.0</woodstox.version>
<wiremock.version>2.33.2</wiremock.version>
<consolecaptor.version>1.0.3</consolecaptor.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " when getting topic statistics data.")
private boolean haProxyProtocolEnabled;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ "requests. Default is false.")
private boolean webServiceHaProxyProtocolEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ "Default is false.")
private boolean webServiceTrustXForwardedFor = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+ "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+ "is enabled.")
private Boolean webServiceLogDetailedAddresses;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty Acceptor."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,23 @@
*/
package org.apache.pulsar.broker.web;

import java.net.InetSocketAddress;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.component.ContainerLifeCycle;

/**
* Class to standardize initialization of a Jetty request logger for all pulsar components.
Expand Down Expand Up @@ -58,7 +72,184 @@ public class JettyRequestLogFactory {
* Build a new Jetty request logger using the format defined in this class.
* @return a request logger
*/
public static CustomRequestLog createRequestLogger() {
return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
public static RequestLog createRequestLogger() {
return createRequestLogger(false, null);
}

/**
* Build a new Jetty request logger using the format defined in this class.
* @param showDetailedAddresses whether to show detailed addresses and ports in logs
* @return a request logger
*/
public static RequestLog createRequestLogger(boolean showDetailedAddresses, Server server) {
if (!showDetailedAddresses) {
return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
} else {
return new OriginalClientIPRequestLog(server);
}
}

/**
* Logs the original and real remote (client) and local (server) IP addresses
* when detailed addresses are enabled.
* Tracks the real addresses of remote and local using a registered Connection.Listener
* when detailed addresses are enabled.
* This is necessary when Proxy Protocol is used to pass the original client IP.
*/
@Slf4j
private static class OriginalClientIPRequestLog extends ContainerLifeCycle implements RequestLog {
private final ThreadLocal<StringBuilder> requestLogStringBuilder = ThreadLocal.withInitial(StringBuilder::new);
private final CustomRequestLog delegate;
private final Slf4jRequestLogWriter delegateLogWriter;

OriginalClientIPRequestLog(Server server) {
delegate = new CustomRequestLog(this::write, LOG_FORMAT);
addBean(delegate);
delegateLogWriter = new Slf4jRequestLogWriter();
addBean(delegateLogWriter);
if (server != null) {
for (Connector connector : server.getConnectors()) {
// adding the listener is only necessary for connectors that use ProxyConnectionFactory
if (connector.getDefaultConnectionFactory() instanceof ProxyConnectionFactory) {
connector.addBean(proxyProtocolOriginalEndpointListener);
}
}
}
}

void write(String requestEntry) {
StringBuilder sb = requestLogStringBuilder.get();
sb.setLength(0);
sb.append(requestEntry);
}

@Override
public void log(Request request, Response response) {
delegate.log(request, response);
StringBuilder sb = requestLogStringBuilder.get();
sb.append(" [R:");
sb.append(request.getRemoteHost());
sb.append(':');
sb.append(request.getRemotePort());
InetSocketAddress realRemoteAddress = lookupRealAddress(request.getHttpChannel().getRemoteAddress());
if (realRemoteAddress != null) {
String realRemoteHost = HostPort.normalizeHost(realRemoteAddress.getHostString());
int realRemotePort = realRemoteAddress.getPort();
if (!realRemoteHost.equals(request.getRemoteHost()) || realRemotePort != request.getRemotePort()) {
sb.append(" via ");
sb.append(realRemoteHost);
sb.append(':');
sb.append(realRemotePort);
}
}
sb.append("]->[L:");
InetSocketAddress realLocalAddress = lookupRealAddress(request.getHttpChannel().getLocalAddress());
if (realLocalAddress != null) {
String realLocalHost = HostPort.normalizeHost(realLocalAddress.getHostString());
int realLocalPort = realLocalAddress.getPort();
sb.append(realLocalHost);
sb.append(':');
sb.append(realLocalPort);
if (!realLocalHost.equals(request.getLocalAddr()) || realLocalPort != request.getLocalPort()) {
sb.append(" dst ");
sb.append(request.getLocalAddr());
sb.append(':');
sb.append(request.getLocalPort());
}
} else {
sb.append(request.getLocalAddr());
sb.append(':');
sb.append(request.getLocalPort());
}
sb.append(']');
try {
delegateLogWriter.write(sb.toString());
} catch (Exception e) {
log.warn("Failed to write request log", e);
}
}

private InetSocketAddress lookupRealAddress(InetSocketAddress socketAddress) {
if (socketAddress == null) {
return null;
}
if (proxyProtocolRealAddressMapping.isEmpty()) {
return socketAddress;
}
AddressEntry entry = proxyProtocolRealAddressMapping.get(new AddressKey(socketAddress.getHostString(),
socketAddress.getPort()));
if (entry != null) {
return entry.realAddress;
} else {
return socketAddress;
}
}

private final Connection.Listener proxyProtocolOriginalEndpointListener =
new ProxyProtocolOriginalEndpointListener();

private final ConcurrentHashMap<AddressKey, AddressEntry> proxyProtocolRealAddressMapping =
new ConcurrentHashMap<>();

// Use a record as key since InetSocketAddress hash code changes if the address gets resolved
record AddressKey(String hostString, int port) {

}

record AddressEntry(InetSocketAddress realAddress, AtomicInteger referenceCount) {

}

// Tracks the real addresses of remote and local when detailed addresses are enabled.
// This is necessary when Proxy Protocol is used to pass the original client IP.
// The Proxy Protocol implementation in Jetty wraps the original endpoint with a ProxyEndPoint
// and the real endpoint information isn't available in the request object.
// This listener is added to all connectors to track the real addresses of the client and server.
class ProxyProtocolOriginalEndpointListener implements Connection.Listener {
@Override
public void onOpened(Connection connection) {
handleConnection(connection, true);
}

@Override
public void onClosed(Connection connection) {
handleConnection(connection, false);
}

private void handleConnection(Connection connection, boolean increment) {
if (connection.getEndPoint() instanceof ProxyConnectionFactory.ProxyEndPoint) {
ProxyConnectionFactory.ProxyEndPoint proxyEndPoint =
(ProxyConnectionFactory.ProxyEndPoint) connection.getEndPoint();
EndPoint originalEndpoint = proxyEndPoint.unwrap();
mapAddress(proxyEndPoint.getLocalAddress(), originalEndpoint.getLocalAddress(), increment);
mapAddress(proxyEndPoint.getRemoteAddress(), originalEndpoint.getRemoteAddress(), increment);
}
}

private void mapAddress(InetSocketAddress current, InetSocketAddress real, boolean increment) {
// don't add the mapping if the current address is the same as the real address
if (real != null && current != null && current.equals(real)) {
return;
}
AddressKey key = new AddressKey(current.getHostString(), current.getPort());
proxyProtocolRealAddressMapping.compute(key, (__, entry) -> {
if (entry == null) {
if (increment) {
entry = new AddressEntry(real, new AtomicInteger(1));
}
} else {
if (increment) {
entry.referenceCount.incrementAndGet();
} else {
if (entry.referenceCount.decrementAndGet() == 0) {
// remove the entry if the reference count drops to 0
entry = null;
}
}
}
return entry;
});
}
}
}
}
7 changes: 7 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>consolecaptor</artifactId>
<version>${consolecaptor.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
Expand Down
Loading

0 comments on commit 4a88721

Please sign in to comment.