Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-common] add option that field can be empty #3543

Merged
merged 1 commit into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -225,8 +226,8 @@ void testAuthentication() throws Exception {

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(servicePort);
proxyConfig.setWebServicePort(webServicePort);
proxyConfig.setServicePort(Optional.of(servicePort));
proxyConfig.setWebServicePort(Optional.of(webServicePort));
proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_SERVER,
doc = "The port for serving binary protobuf requests"
)
private Integer brokerServicePort = 6650;
private Optional<Integer> brokerServicePort = Optional.of(6650);
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving tls secured binary protobuf requests"
)
private Integer brokerServicePortTls = null;
private Optional<Integer> brokerServicePortTls = Optional.empty();
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving http requests"
)
private Integer webServicePort = 8080;
private Optional<Integer> webServicePort = Optional.of(8080);
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving https requests"
)
private Integer webServicePortTls = null;
private Optional<Integer> webServicePortTls = Optional.empty();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -1248,18 +1248,18 @@ public int getBookkeeperHealthCheckIntervalSec() {
}

public Optional<Integer> getBrokerServicePort() {
return Optional.ofNullable(brokerServicePort);
return brokerServicePort;
}

public Optional<Integer> getBrokerServicePortTls() {
return Optional.ofNullable(brokerServicePortTls);
return brokerServicePortTls;
}

public Optional<Integer> getWebServicePort() {
return Optional.ofNullable(webServicePort);
return webServicePort;
}

public Optional<Integer> getWebServicePortTls() {
return Optional.ofNullable(webServicePortTls);
return webServicePortTls;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.isComplete;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -32,6 +33,7 @@
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Optional;
import java.util.Properties;

import org.apache.bookkeeper.client.api.DigestType;
Expand All @@ -44,10 +46,10 @@ public class MockConfiguration implements PulsarConfiguration {

private String zookeeperServers = "localhost:2181";
private String configurationStoreServers = "localhost:2184";
private int brokerServicePort = 7650;
private int brokerServicePortTls = 7651;
private int webServicePort = 9080;
private int webServicePortTls = 9443;
private Optional<Integer> brokerServicePort = Optional.of(7650);
private Optional<Integer> brokerServicePortTls = Optional.of(7651);
private Optional<Integer> webServicePort = Optional.of(9080);
private Optional<Integer> webServicePortTls = Optional.of(9443);
private int notExistFieldInServiceConfig = 0;

@Override
Expand Down Expand Up @@ -102,6 +104,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
printWriter.println("brokerClientAuthenticationParameters=role:my-role");
printWriter.println("superUserRoles=appid1,appid2");
printWriter.println("brokerServicePort=7777");
printWriter.println("brokerServicePortTls=8777");
printWriter.println("webServicePort=");
printWriter.println("webServicePortTls=");
printWriter.println("managedLedgerDefaultMarkDeleteRateLimit=5.0");
printWriter.println("managedLedgerDigestType=CRC32C");
printWriter.println("managedLedgerCacheSizeMB=");
Expand All @@ -116,6 +121,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
assertEquals(serviceConfig.getClusterName(), "usc");
assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role");
assertEquals(serviceConfig.getBrokerServicePort().get(), new Integer(7777));
assertEquals(serviceConfig.getBrokerServicePortTls().get(), new Integer(8777));
assertFalse(serviceConfig.getWebServicePort().isPresent());
assertFalse(serviceConfig.getWebServicePortTls().isPresent());
assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C);
assertTrue(serviceConfig.getManagedLedgerCacheSizeMB() > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -81,12 +82,12 @@ void setup() throws Exception {
brokerNativeBrokerPorts[i] = PortManager.nextFreePort();

ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i]));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(brokerWebServicePorts[i]);
config.setWebServicePort(Optional.ofNullable(brokerWebServicePorts[i]));
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i]));
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
configurations[i] = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.Optional;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -41,16 +53,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import java.security.cert.X509Certificate;
import java.util.List;

@Slf4j
public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest {

Expand All @@ -62,8 +64,8 @@ private static String getTLSFile(String name) {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;

import java.util.Optional;

import static org.testng.Assert.fail;

import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -35,10 +41,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Method;

import static org.testng.Assert.fail;

@Slf4j
public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
protected String methodName;
Expand All @@ -55,8 +57,8 @@ private static String getTLSFile(String name) {
@BeforeMethod
@Override
public void setup() throws Exception {
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
buildConf(conf);
super.internalSetup();
}
Expand Down Expand Up @@ -109,10 +111,10 @@ public void testPersistentList() throws Exception {

/***** Start Broker 2 ******/
ServiceConfiguration conf = new ServiceConfiguration();
conf.setBrokerServicePort(PortManager.nextFreePort());
conf.setBrokerServicePortTls(PortManager.nextFreePort());
conf.setWebServicePort(PortManager.nextFreePort());
conf.setWebServicePortTls(PortManager.nextFreePort());
conf.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
conf.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
conf.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort()));
conf.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
conf.setAdvertisedAddress("localhost");
conf.setClusterName(this.conf.getClusterName());
conf.setZookeeperServers("localhost:2181");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -91,8 +92,10 @@ public MockedPulsarServiceBaseTest() {

protected void resetConfig() {
this.conf = new ServiceConfiguration();
this.conf.setBrokerServicePort(BROKER_PORT);
this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
this.conf.setAdvertisedAddress("localhost");
this.conf.setBrokerServicePort(Optional.ofNullable(BROKER_PORT));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT));
this.conf.setClusterName(configClusterName);
this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
this.conf.setManagedLedgerCacheSizeMB(8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -119,9 +120,9 @@ void setup() throws Exception {
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config1.setClusterName("use");
config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
config1.setWebServicePort(Optional.ofNullable(PRIMARY_BROKER_WEBSERVICE_PORT));
config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
config1.setBrokerServicePort(Optional.ofNullable(PRIMARY_BROKER_PORT));
config1.setFailureDomainsEnabled(true);
config1.setLoadBalancerEnabled(true);
config1.setAdvertisedAddress("localhost");
Expand All @@ -138,9 +139,9 @@ void setup() throws Exception {
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config2.setClusterName("use");
config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
config2.setWebServicePort(Optional.ofNullable(SECONDARY_BROKER_WEBSERVICE_PORT));
config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
config2.setBrokerServicePort(Optional.ofNullable(SECONDARY_BROKER_PORT));
config2.setFailureDomainsEnabled(true);
pulsar2 = new PulsarService(config2);
secondaryHost = String.format("%s:%d", "localhost",
Expand Down
Loading