diff --git a/conf/proxy.conf b/conf/proxy.conf index 4194bf7621985..8285e1cb75320 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -28,17 +28,19 @@ metadataStoreUrl= # The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl configurationMetadataStoreUrl= -# If Service Discovery is Disabled this url should point to the discovery service provider. +# If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the discovery service +# provider, and does not support multi urls yet. # The URL must begin with pulsar:// for plaintext or with pulsar+ssl:// for TLS. brokerServiceURL= brokerServiceURLTLS= -# These settings are unnecessary if `zookeeperServers` is specified +# If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the discovery service +# provider, and does not support multi urls yet. brokerWebServiceURL= brokerWebServiceURLTLS= -# If function workers are setup in a separate cluster, configure the following 2 settings -# to point to the function workers cluster +# If function workers are setup in a separate cluster, configure the following 2 settings. This url should point to +# the discovery service provider of the function workers cluster, and does not support multi urls yet. functionWorkerWebServiceURL= functionWorkerWebServiceURLTLS= diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 148eb579ed196..4ec5b3f77a3e1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -173,23 +173,29 @@ public class ProxyConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_BROKER_DISCOVERY, - doc = "The service url points to the broker cluster. URL must have the pulsar:// prefix." + doc = "If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the" + + " discovery service provider." + + " URL must have the pulsar:// prefix. And does not support multi url yet." ) private String brokerServiceURL; @FieldContext( category = CATEGORY_BROKER_DISCOVERY, - doc = "The tls service url points to the broker cluster. URL must have the pulsar+ssl:// prefix." + doc = "If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the" + + " discovery service provider." + + " URL must have the pulsar+ssl:// prefix. And does not support multi url yet." ) private String brokerServiceURLTLS; @FieldContext( category = CATEGORY_BROKER_DISCOVERY, - doc = "The web service url points to the broker cluster" + doc = "The web service url points to the discovery service provider of the broker cluster, and does not support" + + " multi url yet." ) private String brokerWebServiceURL; @FieldContext( category = CATEGORY_BROKER_DISCOVERY, - doc = "The tls web service url points to the broker cluster" + doc = "The tls web service url points to the discovery service provider of the broker cluster, and does not" + + " support multi url yet." ) private String brokerWebServiceURLTLS; @@ -201,13 +207,15 @@ public class ProxyConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_BROKER_DISCOVERY, - doc = "The web service url points to the function worker cluster." + doc = "The web service url points to the discovery service provider of the function worker cluster, and does" + + " not support multi url yet." + " Only configure it when you setup function workers in a separate cluster" ) private String functionWorkerWebServiceURL; @FieldContext( category = CATEGORY_BROKER_DISCOVERY, - doc = "The tls web service url points to the function worker cluster." + doc = "The tls web service url points to the discovery service provider of the function worker cluster, and" + + " does not support multi url yet." + " Only configure it when you setup function workers in a separate cluster" ) private String functionWorkerWebServiceURLTLS; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 7427331641318..e623d4b85aa09 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -159,11 +159,28 @@ public ProxyServiceStarter(String[] args) throws Exception { if (isNotBlank(config.getBrokerServiceURL())) { checkArgument(config.getBrokerServiceURL().startsWith("pulsar://"), "brokerServiceURL must start with pulsar://"); + ensureUrlNotContainsComma("brokerServiceURL", config.getBrokerServiceURL()); } - if (isNotBlank(config.getBrokerServiceURLTLS())) { checkArgument(config.getBrokerServiceURLTLS().startsWith("pulsar+ssl://"), "brokerServiceURLTLS must start with pulsar+ssl://"); + ensureUrlNotContainsComma("brokerServiceURLTLS", config.getBrokerServiceURLTLS()); + } + + if (isNotBlank(config.getBrokerWebServiceURL())) { + ensureUrlNotContainsComma("brokerWebServiceURL", config.getBrokerWebServiceURL()); + } + if (isNotBlank(config.getBrokerWebServiceURLTLS())) { + ensureUrlNotContainsComma("brokerWebServiceURLTLS", config.getBrokerWebServiceURLTLS()); + } + + if (isNotBlank(config.getFunctionWorkerWebServiceURL())) { + ensureUrlNotContainsComma("functionWorkerWebServiceURLTLS", + config.getFunctionWorkerWebServiceURL()); + } + if (isNotBlank(config.getFunctionWorkerWebServiceURLTLS())) { + ensureUrlNotContainsComma("functionWorkerWebServiceURLTLS", + config.getFunctionWorkerWebServiceURLTLS()); } if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS())) @@ -184,6 +201,11 @@ public ProxyServiceStarter(String[] args) throws Exception { } } + private void ensureUrlNotContainsComma(String paramName, String paramValue) { + checkArgument(!paramValue.contains(","), paramName + " does not support multi urls yet," + + " it should point to the discovery service provider."); + } + public static void main(String[] args) throws Exception { ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args); try { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java index 97a73c20b60d0..a9a562e04c899 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java @@ -20,6 +20,8 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.beans.Introspector; @@ -36,6 +38,8 @@ import java.util.Properties; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; @Test(groups = "broker") public class ProxyConfigurationTest { @@ -134,4 +138,119 @@ public void testConvert() throws IOException { } } + @Test + public void testBrokerUrlCheck() throws IOException { + ProxyConfiguration configuration = new ProxyConfiguration(); + // brokerServiceURL must start with pulsar:// + configuration.setBrokerServiceURL("127.0.0.1:6650"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("brokerServiceURL must start with pulsar://"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("brokerServiceURL must start with pulsar://")); + } + } + configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650"); + + // brokerServiceURLTLS must start with pulsar+ssl:// + configuration.setBrokerServiceURLTLS("pulsar://127.0.0.1:6650"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("brokerServiceURLTLS must start with pulsar+ssl://"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("brokerServiceURLTLS must start with pulsar+ssl://")); + } + } + + // brokerServiceURL did not support multi urls yet. + configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650,pulsar://127.0.0.2:6650"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("brokerServiceURL does not support multi urls yet"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("does not support multi urls yet")); + } + } + configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650"); + + // brokerServiceURLTLS did not support multi urls yet. + configuration.setBrokerServiceURLTLS("pulsar+ssl://127.0.0.1:6650,pulsar+ssl:127.0.0.2:6650"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("brokerServiceURLTLS does not support multi urls yet"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("does not support multi urls yet")); + } + } + configuration.setBrokerServiceURLTLS("pulsar+ssl://127.0.0.1:6650"); + + // brokerWebServiceURL did not support multi urls yet. + configuration.setBrokerWebServiceURL("http://127.0.0.1:8080,http://127.0.0.2:8080"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("brokerWebServiceURL does not support multi urls yet"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("does not support multi urls yet")); + } + } + configuration.setBrokerWebServiceURL("http://127.0.0.1:8080"); + + // brokerWebServiceURLTLS did not support multi urls yet. + configuration.setBrokerWebServiceURLTLS("https://127.0.0.1:443,https://127.0.0.2:443"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("brokerWebServiceURLTLS does not support multi urls yet"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("does not support multi urls yet")); + } + } + configuration.setBrokerWebServiceURLTLS("https://127.0.0.1:443"); + + // functionWorkerWebServiceURL did not support multi urls yet. + configuration.setFunctionWorkerWebServiceURL("http://127.0.0.1:8080,http://127.0.0.2:8080"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("functionWorkerWebServiceURL does not support multi urls yet"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("does not support multi urls yet")); + } + } + configuration.setFunctionWorkerWebServiceURL("http://127.0.0.1:8080"); + + // functionWorkerWebServiceURLTLS did not support multi urls yet. + configuration.setFunctionWorkerWebServiceURLTLS("http://127.0.0.1:443,http://127.0.0.2:443"); + try (MockedStatic theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) { + theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any())) + .thenReturn(configuration); + try { + new ProxyServiceStarter(ProxyServiceStarterTest.ARGS); + fail("functionWorkerWebServiceURLTLS does not support multi urls yet"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("does not support multi urls yet")); + } + } + configuration.setFunctionWorkerWebServiceURLTLS("http://127.0.0.1:443"); + } + } \ No newline at end of file diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index def58be6df372..a9bead706a373 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -45,7 +45,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { - static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"}; + public static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"}; protected ProxyServiceStarter serviceStarter; protected String serviceUrl;