Skip to content

Commit

Permalink
[Broker] Support disabling non-TLS service ports (apache#11681)
Browse files Browse the repository at this point in the history
* Support disabling non-tls service ports

* Add docs for disabling non-TLS ports

* Update site2/docs/security-tls-keystore.md

Co-authored-by: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com>
(cherry picked from commit 50b6e79)
  • Loading branch information
lhotari committed Aug 19, 2021
1 parent 54d8f1e commit fc43a00
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1251,12 +1251,15 @@ private void startWorkerService(AuthenticationService authenticationService,
LOG.info("Starting function worker service");

WorkerConfig workerConfig = functionWorkerService.get().getWorkerConfig();
if (workerConfig.isUseTls()) {
if (workerConfig.isUseTls() || brokerServiceUrl == null) {
workerConfig.setPulsarServiceUrl(brokerServiceUrlTls);
} else {
workerConfig.setPulsarServiceUrl(brokerServiceUrl);
}
if (workerConfig.isUseTls() || webServiceAddress == null) {
workerConfig.setPulsarWebServiceUrl(webServiceAddressTls);
workerConfig.setFunctionWebServiceUrl(webServiceAddressTls);
} else {
workerConfig.setPulsarServiceUrl(brokerServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceAddress);
workerConfig.setFunctionWebServiceUrl(webServiceAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void initialize(PulsarService pulsar) {

@Override
public void start() throws PulsarServerException {
lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get();
lookupServiceAddress = getBrokerAddress();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());
zkClient = pulsar.getZkClient();
Expand Down Expand Up @@ -93,6 +93,13 @@ public void start() throws PulsarServerException {
}
}

private String getBrokerAddress() {
return String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort().isPresent()
? pulsar.getConfiguration().getWebServicePort().get()
: pulsar.getConfiguration().getWebServicePortTls().get());
}

@Override
public boolean isCentralized() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ private String getBrokerAddress() {
return String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort().isPresent()
? pulsar.getConfiguration().getWebServicePort().get()
: pulsar.getConfiguration().getWebServicePortTls());
: pulsar.getConfiguration().getWebServicePortTls().get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -221,7 +222,7 @@ protected final void internalCleanup() throws Exception {
}
bkExecutor = null;
}

}

protected abstract void setup() throws Exception;
Expand All @@ -246,13 +247,16 @@ protected void startBroker() throws Exception {
}
this.pulsar = startBroker(conf);

brokerUrl = new URL(pulsar.getWebServiceAddress());
brokerUrlTls = new URL(pulsar.getWebServiceAddressTls());
brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null;
brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null;

if (admin != null) {
admin.close();
}
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build());
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString()
: brokerUrlTls.toString());
admin = spy(pulsarAdminBuilder.build());
}

protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,38 @@ public void testTlsEnabled() throws Exception {
}
}

@Test
public void testTlsEnabledWithoutNonTlsServicePorts() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";

conf.setAuthenticationEnabled(false);
conf.setBrokerServicePort(Optional.empty());
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePort(Optional.empty());
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

// Access with TLS (Allow insecure TLS connection)
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();

} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
}

@SuppressWarnings("deprecation")
@Test
public void testTlsAuthAllowInsecure() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public void testOptionalSettingPresent() throws Exception {
assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5.0));
}

@Test
public void testServicePortsEmpty() throws Exception {
String confFile = "brokerServicePort=\nwebServicePort=\n";
InputStream stream = new ByteArrayInputStream(confFile.getBytes());
final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
assertEquals(config.getBrokerServicePort(), Optional.empty());
assertEquals(config.getWebServicePort(), Optional.empty());
}

/**
* test {@link ServiceConfiguration} with incorrect values.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;

import java.util.stream.Collectors;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
Expand Down Expand Up @@ -121,7 +123,9 @@ public void addRestResources(String basePath, String javaPackages, String attrib
}

public void start() throws PulsarServerException {
log.info("Starting web socket proxy at port {}", conf.getWebServicePort().get());
log.info("Starting web socket proxy at port {}", Arrays.stream(server.getConnectors())
.map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
.collect(Collectors.joining(",")));
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
Expand Down
13 changes: 13 additions & 0 deletions site2/docs/security-tls-keystore.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,19 @@ brokerClientTlsTrustStorePassword=clientpw

NOTE: it is important to restrict access to the store files via filesystem permissions.

If you have configured TLS on the broker, to disable non-TLS ports, you can set the values of the following configurations to empty as below.
```
brokerServicePort=
webServicePort=
```
In this case, you need to set the following configurations.

```conf
brokerClientTlsEnabled=true // Set this to true
brokerClientTlsEnabledWithKeyStore=true // Set this to true
brokerClientTlsTrustStore= // Set this to your desired value
brokerClientTlsTrustStorePassword= // Set this to your desired value
Optional settings that may worth consider:
1. tlsClientAuthentication=false: Enable/Disable using TLS for authentication. This config when enabled will authenticate the other end
Expand Down

0 comments on commit fc43a00

Please sign in to comment.