Skip to content

Commit

Permalink
[improve][proxy] Reuse authentication instance in pulsar-proxy (apach…
Browse files Browse the repository at this point in the history
…e#23113)

(cherry picked from commit 3e461c0)
(cherry picked from commit b805a4a)
equanz authored and srinath-ctds committed Aug 20, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent b46125e commit b04b905
Showing 41 changed files with 581 additions and 162 deletions.
Original file line number Diff line number Diff line change
@@ -258,7 +258,11 @@ void testAuthentication() throws Exception {
proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);
@Cleanup
final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);

proxyService.start();
final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get();
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
@@ -40,7 +39,6 @@
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.SecurityUtility;
@@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet {

private final ProxyConfiguration config;
private final BrokerDiscoveryProvider discoveryProvider;
private final Authentication proxyClientAuthentication;
private final String brokerWebServiceUrl;
private final String functionWorkerWebServiceUrl;

AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider,
Authentication proxyClientAuthentication) {
this.config = config;
this.discoveryProvider = discoveryProvider;
this.proxyClientAuthentication = proxyClientAuthentication;
this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS()
: config.getBrokerWebServiceURL();
this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS()
@@ -256,22 +257,13 @@ protected ContentProvider proxyRequestContent(HttpServletRequest request,
@Override
protected HttpClient newHttpClient() {
try {
Authentication auth = AuthenticationFactory.create(
config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()
);

Objects.requireNonNull(auth, "No supported auth found for proxy");

auth.start();

if (config.isTlsEnabledWithBroker()) {
try {
X509Certificate[] trustCertificates = SecurityUtility
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());

SSLContext sslCtx;
AuthenticationDataProvider authData = auth.getAuthData();
AuthenticationDataProvider authData = proxyClientAuthentication.getAuthData();
if (config.isBrokerClientTlsEnabledWithKeyStore()) {
KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
sslCtx = KeyStoreSSLContext.createClientSslContext(
@@ -311,11 +303,6 @@ protected HttpClient newHttpClient() {
return new JettyHttpClient(contextFactory);
} catch (Exception e) {
LOG.error("new jetty http client exception ", e);
try {
auth.close();
} catch (IOException ioe) {
LOG.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -52,7 +52,6 @@
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
@@ -114,8 +113,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection)

if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
try {
authData = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()).getAuthData();
authData = authentication.getAuthData();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
Original file line number Diff line number Diff line change
@@ -63,8 +63,6 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -152,7 +150,8 @@ public class ProxyService implements Closeable {
private final ConnectionController connectionController;

public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws Exception {
AuthenticationService authenticationService,
Authentication proxyClientAuthentication) throws Exception {
requireNonNull(proxyConfig);
this.proxyConfig = proxyConfig;
this.clientCnxs = Sets.newConcurrentHashSet();
@@ -201,12 +200,7 @@ public ProxyService(ProxyConfiguration proxyConfig,
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}
this.proxyClientAuthentication = proxyClientAuthentication;
this.connectionController = new ConnectionController.DefaultConnectionController(
proxyConfig.getMaxConcurrentInboundConnections(),
proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
Original file line number Diff line number Diff line change
@@ -31,11 +31,13 @@
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
import io.prometheus.client.hotspot.DefaultExports;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Objects;
import java.util.function.Consumer;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
@@ -45,6 +47,10 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -99,6 +105,9 @@ public class ProxyServiceStarter {

private ProxyConfiguration config;

@Getter
private Authentication proxyClientAuthentication;

@Getter
private ProxyService proxyService;

@@ -239,8 +248,27 @@ public static void main(String[] args) throws Exception {
public void start() throws Exception {
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));

if (config.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
Objects.requireNonNull(proxyClientAuthentication, "No supported auth found for proxy");
try {
proxyClientAuthentication.start();
} catch (Exception e) {
try {
proxyClientAuthentication.close();
} catch (IOException ioe) {
log.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}

// create proxy service
proxyService = new ProxyService(config, authenticationService);
proxyService = new ProxyService(config, authenticationService, proxyClientAuthentication);
// create a web-service
server = new WebServer(config, authenticationService);

@@ -287,7 +315,8 @@ public double get() {
metricsInitialized = true;
}

addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(),
proxyClientAuthentication);

// start web-service
server.start();
@@ -301,6 +330,9 @@ public void close() {
if (server != null) {
server.stop();
}
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
@@ -311,9 +343,10 @@ public void close() {
}

public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider,
Authentication proxyClientAuthentication) throws Exception {
// We can make 'status.html' publicly accessible without authentication since
// it does not contain any sensitive data.
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
@@ -330,7 +363,8 @@ public static void addWebServerHandlers(WebServer server,
}
}

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider,
proxyClientAuthentication);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
server.addServlet("/admin", servletHolder);
server.addServlet("/lookup", servletHolder);
Original file line number Diff line number Diff line change
@@ -26,6 +26,8 @@
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -121,6 +123,7 @@ public void close() {
private ProxyService proxyService;
private boolean useSeparateThreadPoolForProxyExtensions;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
private Authentication proxyClientAuthentication;

public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) {
this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions;
@@ -141,8 +144,12 @@ protected void setup() throws Exception {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);

proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();

proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication));
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();

@@ -172,6 +179,9 @@ public void testBootstrapProtocolHandler() throws Exception {
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}

if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
Original file line number Diff line number Diff line change
@@ -18,11 +18,18 @@
*/
package org.apache.pulsar.proxy.server;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -34,18 +41,13 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTest {


private final ProxyConfiguration proxyConfig = new ProxyConfiguration();

private Authentication proxyClientAuthentication;

private WebServer webServer;

private BrokerDiscoveryProvider discoveryProvider;
@@ -103,12 +105,16 @@ protected void setup() throws Exception {

resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
new ZKMetadataStore(mockZooKeeperGlobal));
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();

webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
webServer.start();
@@ -118,6 +124,9 @@ protected void setup() throws Exception {
@Override
protected void cleanup() throws Exception {
webServer.stop();
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}
super.internalCleanup();
}

Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.client.api.Authentication;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.testng.Assert;
@@ -46,7 +47,7 @@ public void setupMocks() throws ServletException {
// given
HttpClient httpClient = mock(HttpClient.class);
adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class),
mock(BrokerDiscoveryProvider.class)) {
mock(BrokerDiscoveryProvider.class), mock(Authentication.class)) {
@Override
protected HttpClient createHttpClient() throws ServletException {
return httpClient;
Loading

0 comments on commit b04b905

Please sign in to comment.