Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests #22524

Merged
merged 6 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Number of threads to config Netty Acceptor. Default is 1
numAcceptorThreads=

Expand Down
10 changes: 10 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ workerHostname: localhost
workerPort: 6750
workerPortTls: 6751

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled: false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor: false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses: null

# The Configuration metadata store url
# Examples:
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
Expand Down
10 changes: 10 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Enables zero-copy transport of data across network interfaces using the splice system call.
# Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.
proxyZeroCopyModeEnabled=true
Expand Down
10 changes: 10 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ advertisedAddress=
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numIOThreads=

Expand Down
10 changes: 10 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ statusFilePath=
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false.
webServiceTrustXForwardedFor=false

# Add detailed client/remote and server/local addresses and ports to http/https request logging.
# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled.
webServiceLogDetailedAddresses=

# Name of the pulsar cluster to connect to
clusterName=

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ flexible messaging model and an intuitive client API.</description>
<jettison.version>1.5.4</jettison.version>
<woodstox.version>5.4.0</woodstox.version>
<wiremock.version>2.33.2</wiremock.version>
<consolecaptor.version>1.0.3</consolecaptor.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " when getting topic statistics data.")
private boolean haProxyProtocolEnabled;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the use of HA proxy protocol for resolving the client IP for http/https "
+ "requests. Default is false.")
private boolean webServiceHaProxyProtocolEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Trust X-Forwarded-For header for resolving the client IP for http/https requests.\n"
+ "Default is false.")
private boolean webServiceTrustXForwardedFor = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"Add detailed client/remote and server/local addresses and ports to http/https request logging.\n"
+ "Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor "
+ "is enabled.")
private Boolean webServiceLogDetailedAddresses;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for Netty Acceptor."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,23 @@
*/
package org.apache.pulsar.broker.web;

import java.net.InetSocketAddress;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.component.ContainerLifeCycle;

/**
* Class to standardize initialization of a Jetty request logger for all pulsar components.
Expand Down Expand Up @@ -58,7 +72,184 @@ public class JettyRequestLogFactory {
* Build a new Jetty request logger using the format defined in this class.
* @return a request logger
*/
public static CustomRequestLog createRequestLogger() {
return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
public static RequestLog createRequestLogger() {
return createRequestLogger(false, null);
}

/**
* Build a new Jetty request logger using the format defined in this class.
* @param showDetailedAddresses whether to show detailed addresses and ports in logs
* @return a request logger
*/
public static RequestLog createRequestLogger(boolean showDetailedAddresses, Server server) {
if (!showDetailedAddresses) {
return new CustomRequestLog(new Slf4jRequestLogWriter(), LOG_FORMAT);
} else {
return new OriginalClientIPRequestLog(server);
}
}

/**
* Logs the original and real remote (client) and local (server) IP addresses
* when detailed addresses are enabled.
* Tracks the real addresses of remote and local using a registered Connection.Listener
* when detailed addresses are enabled.
* This is necessary when Proxy Protocol is used to pass the original client IP.
*/
@Slf4j
private static class OriginalClientIPRequestLog extends ContainerLifeCycle implements RequestLog {
private final ThreadLocal<StringBuilder> requestLogStringBuilder = ThreadLocal.withInitial(StringBuilder::new);
private final CustomRequestLog delegate;
private final Slf4jRequestLogWriter delegateLogWriter;

OriginalClientIPRequestLog(Server server) {
delegate = new CustomRequestLog(this::write, LOG_FORMAT);
addBean(delegate);
delegateLogWriter = new Slf4jRequestLogWriter();
addBean(delegateLogWriter);
if (server != null) {
for (Connector connector : server.getConnectors()) {
// adding the listener is only necessary for connectors that use ProxyConnectionFactory
if (connector.getDefaultConnectionFactory() instanceof ProxyConnectionFactory) {
connector.addBean(proxyProtocolOriginalEndpointListener);
}
}
}
}

void write(String requestEntry) {
StringBuilder sb = requestLogStringBuilder.get();
sb.setLength(0);
sb.append(requestEntry);
}

@Override
public void log(Request request, Response response) {
delegate.log(request, response);
StringBuilder sb = requestLogStringBuilder.get();
sb.append(" [R:");
sb.append(request.getRemoteHost());
sb.append(':');
sb.append(request.getRemotePort());
InetSocketAddress realRemoteAddress = lookupRealAddress(request.getHttpChannel().getRemoteAddress());
if (realRemoteAddress != null) {
String realRemoteHost = HostPort.normalizeHost(realRemoteAddress.getHostString());
int realRemotePort = realRemoteAddress.getPort();
if (!realRemoteHost.equals(request.getRemoteHost()) || realRemotePort != request.getRemotePort()) {
sb.append(" via ");
sb.append(realRemoteHost);
sb.append(':');
sb.append(realRemotePort);
}
}
sb.append("]->[L:");
InetSocketAddress realLocalAddress = lookupRealAddress(request.getHttpChannel().getLocalAddress());
if (realLocalAddress != null) {
String realLocalHost = HostPort.normalizeHost(realLocalAddress.getHostString());
int realLocalPort = realLocalAddress.getPort();
sb.append(realLocalHost);
sb.append(':');
sb.append(realLocalPort);
if (!realLocalHost.equals(request.getLocalAddr()) || realLocalPort != request.getLocalPort()) {
sb.append(" dst ");
sb.append(request.getLocalAddr());
sb.append(':');
sb.append(request.getLocalPort());
}
} else {
sb.append(request.getLocalAddr());
sb.append(':');
sb.append(request.getLocalPort());
}
sb.append(']');
try {
delegateLogWriter.write(sb.toString());
} catch (Exception e) {
log.warn("Failed to write request log", e);
}
}

private InetSocketAddress lookupRealAddress(InetSocketAddress socketAddress) {
if (socketAddress == null) {
return null;
}
if (proxyProtocolRealAddressMapping.isEmpty()) {
return socketAddress;
}
AddressEntry entry = proxyProtocolRealAddressMapping.get(new AddressKey(socketAddress.getHostString(),
socketAddress.getPort()));
if (entry != null) {
return entry.realAddress;
} else {
return socketAddress;
}
}

private final Connection.Listener proxyProtocolOriginalEndpointListener =
new ProxyProtocolOriginalEndpointListener();

private final ConcurrentHashMap<AddressKey, AddressEntry> proxyProtocolRealAddressMapping =
new ConcurrentHashMap<>();

// Use a record as key since InetSocketAddress hash code changes if the address gets resolved
record AddressKey(String hostString, int port) {

}

record AddressEntry(InetSocketAddress realAddress, AtomicInteger referenceCount) {

}

// Tracks the real addresses of remote and local when detailed addresses are enabled.
// This is necessary when Proxy Protocol is used to pass the original client IP.
// The Proxy Protocol implementation in Jetty wraps the original endpoint with a ProxyEndPoint
// and the real endpoint information isn't available in the request object.
// This listener is added to all connectors to track the real addresses of the client and server.
class ProxyProtocolOriginalEndpointListener implements Connection.Listener {
@Override
public void onOpened(Connection connection) {
handleConnection(connection, true);
}

@Override
public void onClosed(Connection connection) {
handleConnection(connection, false);
}

private void handleConnection(Connection connection, boolean increment) {
if (connection.getEndPoint() instanceof ProxyConnectionFactory.ProxyEndPoint) {
ProxyConnectionFactory.ProxyEndPoint proxyEndPoint =
(ProxyConnectionFactory.ProxyEndPoint) connection.getEndPoint();
EndPoint originalEndpoint = proxyEndPoint.unwrap();
mapAddress(proxyEndPoint.getLocalAddress(), originalEndpoint.getLocalAddress(), increment);
mapAddress(proxyEndPoint.getRemoteAddress(), originalEndpoint.getRemoteAddress(), increment);
}
}

private void mapAddress(InetSocketAddress current, InetSocketAddress real, boolean increment) {
// don't add the mapping if the current address is the same as the real address
if (real != null && current != null && current.equals(real)) {
return;
}
AddressKey key = new AddressKey(current.getHostString(), current.getPort());
proxyProtocolRealAddressMapping.compute(key, (__, entry) -> {
if (entry == null) {
if (increment) {
entry = new AddressEntry(real, new AtomicInteger(1));
}
} else {
if (increment) {
entry.referenceCount.incrementAndGet();
} else {
if (entry.referenceCount.decrementAndGet() == 0) {
// remove the entry if the reference count drops to 0
entry = null;
}
}
}
return entry;
});
}
}
}
}
7 changes: 7 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>consolecaptor</artifactId>
<version>${consolecaptor.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
Expand Down
Loading
Loading