From 7df17247f5eeefba02c29bfe49f559ad5b060ca5 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 18 Apr 2024 20:18:38 +0300 Subject: [PATCH] Prevent shutdown hook leak in ProxyServiceStarterTest --- pulsar-proxy/pom.xml | 6 + .../proxy/server/ProxyServiceStarter.java | 20 ++- .../server/ProxyOriginalClientIPTest.java | 146 ++++++++++++++++++ .../proxy/server/ProxyServiceStarterTest.java | 1 + pulsar-proxy/src/test/resources/log4j2.xml | 36 +++++ 5 files changed, 207 insertions(+), 2 deletions(-) create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java create mode 100644 pulsar-proxy/src/test/resources/log4j2.xml diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index 64ca301facf4d1..a30e23b8d47812 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -209,6 +209,12 @@ ${wiremock.version} test + + io.github.hakky54 + consolecaptor + ${consolecaptor.version} + test + diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 50a8e3ab7d7532..e30d09157fc074 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -35,7 +35,9 @@ import java.util.Collections; import java.util.Date; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import lombok.Getter; +import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.util.datetime.FixedDateFormat; import org.apache.pulsar.broker.PulsarServerException; @@ -109,8 +111,15 @@ public class ProxyServiceStarter { private WebServer server; private WebSocketService webSocketService; private static boolean metricsInitialized; + @Setter + private boolean embeddedMode = false; public ProxyServiceStarter(String[] args) throws Exception { + this(args, null); + } + + public ProxyServiceStarter(String[] args, Consumer proxyConfigurationCustomizer) + throws Exception { try { DateFormat dateFormat = new SimpleDateFormat( FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern()); @@ -141,6 +150,9 @@ public ProxyServiceStarter(String[] args) throws Exception { // load config file config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class); + if (proxyConfigurationCustomizer != null) { + proxyConfigurationCustomizer.accept(config); + } if (!isBlank(zookeeperServers)) { // Use zookeeperServers from command line @@ -230,7 +242,9 @@ public void start() throws Exception { // create a web-service server = new WebServer(config, authenticationService); - Runtime.getRuntime().addShutdownHook(new Thread(this::close)); + if (!embeddedMode) { + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); + } proxyService.start(); @@ -293,7 +307,9 @@ public void close() { } catch (Exception e) { log.warn("server couldn't stop gracefully {}", e.getMessage(), e); } finally { - LogManager.shutdown(); + if (!embeddedMode) { + LogManager.shutdown(); + } } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java new file mode 100644 index 00000000000000..48b78c035d3a88 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import nl.altindag.console.ConsoleCaptor; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.awaitility.Awaitility; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.ProxyProtocolClientConnectionFactory.V2; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ProxyOriginalClientIPTest extends MockedPulsarServiceBaseTest { + static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"}; + HttpClient httpClient; + ProxyServiceStarter serviceStarter; + String webServiceUrl; + String webServiceUrlTls; + + @Override + @BeforeClass + protected void setup() throws Exception { + internalSetup(); + serviceStarter = new ProxyServiceStarter(ARGS, proxyConfig -> { + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress()); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setWebServicePortTls(Optional.of(0)); + proxyConfig.setTlsEnabledWithBroker(false); + proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH); + proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebSocketServiceEnabled(true); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setClusterName(configClusterName); + proxyConfig.setWebServiceTrustXForwardedFor(true); + proxyConfig.setWebServiceHaProxyProtocolEnabled(true); + }); + serviceStarter.setEmbeddedMode(true); + serviceStarter.start(); + webServiceUrl = "http://localhost:" + serviceStarter.getServer().getListenPortHTTP().get(); + webServiceUrlTls = "https://localhost:" + serviceStarter.getServer().getListenPortHTTPS().get(); + httpClient = new HttpClient(new SslContextFactory(true)); + httpClient.start(); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + internalCleanup(); + if (serviceStarter != null) { + serviceStarter.close(); + } + if (httpClient != null) { + httpClient.stop(); + } + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setWebServiceTrustXForwardedFor(true); + } + + @DataProvider(name = "tlsEnabled") + public Object[][] tlsEnabled() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "tlsEnabled") + public void testClientIPIsPickedFromXForwardedForHeaderAndLogged(boolean tlsEnabled) throws Exception { + String metricsUrl = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/metrics/"; + ConsoleCaptor consoleCaptor = new ConsoleCaptor(); + try { + Awaitility.await().untilAsserted(() -> { + consoleCaptor.clearOutput(); + + // Send a GET request to the metrics URL + ContentResponse response = httpClient.newRequest(metricsUrl) + .header("X-Forwarded-For", "11.22.33.44") + .send(); + + // Validate the response + assertTrue(response.getContentAsString().contains("process_cpu_seconds_total")); + + // Validate that the client IP passed in HA Proxy protocol is logged + assertTrue(consoleCaptor.getStandardOutput().stream() + .anyMatch(line -> line.contains("RequestLog") && line.contains("- 11.22.33.44"))); + }); + } finally { + consoleCaptor.close(); + } + } + + + @Test(dataProvider = "tlsEnabled") + public void testClientIPIsPickedFromHAProxyProtocolAndLogged(boolean tlsEnabled) throws Exception { + String metricsUrl = (tlsEnabled ? webServiceUrlTls : webServiceUrl) + "/metrics/"; + ConsoleCaptor consoleCaptor = new ConsoleCaptor(); + try { + Awaitility.await().untilAsserted(() -> { + consoleCaptor.clearOutput(); + + // Send a GET request to the metrics URL + ContentResponse response = httpClient.newRequest(metricsUrl) + // Jetty client will add HA Proxy protocol header with the given IP to the request + .tag(new V2.Tag("99.22.33.44", 1234)) + .send(); + + // Validate the response + assertTrue(response.getContentAsString().contains("process_cpu_seconds_total")); + + // Validate that the client IP passed in HA Proxy protocol is logged + assertTrue(consoleCaptor.getStandardOutput().stream() + .anyMatch(line -> line.contains("RequestLog") && line.contains("- 99.22.33.44"))); + }); + } finally { + consoleCaptor.close(); + } + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index f2632861253533..e28dbb459c7d7a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -62,6 +62,7 @@ protected void setup() throws Exception { serviceStarter.getConfig().setWebSocketServiceEnabled(true); serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*"); serviceStarter.getConfig().setClusterName(configClusterName); + serviceStarter.setEmbeddedMode(true); serviceStarter.start(); serviceUrl = serviceStarter.getProxyService().getServiceUrl(); } diff --git a/pulsar-proxy/src/test/resources/log4j2.xml b/pulsar-proxy/src/test/resources/log4j2.xml new file mode 100644 index 00000000000000..261bd2edf69809 --- /dev/null +++ b/pulsar-proxy/src/test/resources/log4j2.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + +