Skip to content

Commit

Permalink
Prevent shutdown hook leak in ProxyServiceStarterTest
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Apr 18, 2024
1 parent 86d0800 commit 7df1724
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 2 deletions.
6 changes: 6 additions & 0 deletions pulsar-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>consolecaptor</artifactId>
<version>${consolecaptor.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -109,8 +111,15 @@ public class ProxyServiceStarter {
private WebServer server;
private WebSocketService webSocketService;
private static boolean metricsInitialized;
@Setter
private boolean embeddedMode = false;

public ProxyServiceStarter(String[] args) throws Exception {
this(args, null);
}

public ProxyServiceStarter(String[] args, Consumer<ProxyConfiguration> proxyConfigurationCustomizer)
throws Exception {
try {
DateFormat dateFormat = new SimpleDateFormat(
FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
Expand Down Expand Up @@ -141,6 +150,9 @@ public ProxyServiceStarter(String[] args) throws Exception {

// load config file
config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
if (proxyConfigurationCustomizer != null) {
proxyConfigurationCustomizer.accept(config);
}

if (!isBlank(zookeeperServers)) {
// Use zookeeperServers from command line
Expand Down Expand Up @@ -230,7 +242,9 @@ public void start() throws Exception {
// create a web-service
server = new WebServer(config, authenticationService);

Runtime.getRuntime().addShutdownHook(new Thread(this::close));
if (!embeddedMode) {
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
}

proxyService.start();

Expand Down Expand Up @@ -293,7 +307,9 @@ public void close() {
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
LogManager.shutdown();
if (!embeddedMode) {
LogManager.shutdown();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import static org.testng.Assert.assertTrue;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import nl.altindag.console.ConsoleCaptor;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.awaitility.Awaitility;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class ProxyOriginalClientIPTest extends MockedPulsarServiceBaseTest {
static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
HttpClient httpClient;
ProxyServiceStarter serviceStarter;
String webServiceUrl;
String webServiceUrlTls;

@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
serviceStarter = new ProxyServiceStarter(ARGS, proxyConfig -> {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress());
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(false);
proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setWebSocketServiceEnabled(true);
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setClusterName(configClusterName);
proxyConfig.setWebServiceTrustXForwardedFor(true);
proxyConfig.setWebServiceHaProxyProtocolEnabled(true);
});
serviceStarter.setEmbeddedMode(true);
serviceStarter.start();
webServiceUrl = "http://localhost:" + serviceStarter.getServer().getListenPortHTTP().get();
webServiceUrlTls = "https://localhost:" + serviceStarter.getServer().getListenPortHTTPS().get();
httpClient = new HttpClient(new SslContextFactory(true));
httpClient.start();
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
if (serviceStarter != null) {
serviceStarter.close();
}
if (httpClient != null) {
httpClient.stop();
}
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setWebServiceTrustXForwardedFor(true);
}

@DataProvider(name = "tlsEnabled")
public Object[][] tlsEnabled() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "tlsEnabled")
public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception {
String metricsUrl = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/metrics/";
ConsoleCaptor consoleCaptor = new ConsoleCaptor();
try {
Awaitility.await().untilAsserted(() -> {
consoleCaptor.clearOutput();

// Send a GET request to the metrics URL
ContentResponse response = httpClient.newRequest(metricsUrl)
.header("X-Forwarded-For", "11.22.33.44")
.send();

// Validate the response
assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));

// Validate that the client IP passed in HA Proxy protocol is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
.anyMatch(line -> line.contains("RequestLog") && line.contains("- 11.22.33.44")));
});
} finally {
consoleCaptor.close();
}
}


@Test(dataProvider = "tlsEnabled")
public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception {
String metricsUrl = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/metrics/";
ConsoleCaptor consoleCaptor = new ConsoleCaptor();
try {
Awaitility.await().untilAsserted(() -> {
consoleCaptor.clearOutput();

// Send a GET request to the metrics URL
ContentResponse response = httpClient.newRequest(metricsUrl)
// Jetty client will add HA Proxy protocol header with the given IP to the request
.tag(new V2.Tag("99.22.33.44", 1234))
.send();

// Validate the response
assertTrue(response.getContentAsString().contains("process_cpu_seconds_total"));

// Validate that the client IP passed in HA Proxy protocol is logged
assertTrue(consoleCaptor.getStandardOutput().stream()
.anyMatch(line -> line.contains("RequestLog") && line.contains("- 99.22.33.44")));
});
} finally {
consoleCaptor.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ protected void setup() throws Exception {
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
serviceStarter.getConfig().setClusterName(configClusterName);
serviceStarter.setEmbeddedMode(true);
serviceStarter.start();
serviceUrl = serviceStarter.getProxyService().getServiceUrl();
}
Expand Down
36 changes: 36 additions & 0 deletions pulsar-proxy/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
<Appenders>
<!-- setting follow="true" is required for using ConsoleCaptor to validate log messages -->
<Console name="CONSOLE" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="CONSOLE"/>
</Root>
</Loggers>
</Configuration>

0 comments on commit 7df1724

Please sign in to comment.