Skip to content

Commit

Permalink
[improve] [proxy] Add a check for brokerServiceURL that does not supp…
Browse files Browse the repository at this point in the history
…ort multi uri yet (apache#21972)

### Motivation

At the beginning of the design, these two configurations(`brokerServiceURL & brokerServiceURLTLS`) do not support setting multiple broker addresses, which should instead be set to a “discovery service provider.” see: apache#1002 and apache#14682

Users will get the below error if they set A to a multi-broker URLs

```
"2024-01-09 00:20:10,261 -0800 [pulsar-proxy-io-4-7] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.IllegalArgumentException: port out of range:-1
        at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143) ~[?:?]
        at java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:254) ~[?:?]
        at org.apache.pulsar.proxy.server.LookupProxyHandler.getAddr(LookupProxyHandler.java:432) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
        at org.apache.pulsar.proxy.server.LookupProxyHandler.handleGetSchema(LookupProxyHandler.java:357) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
        at org.apache.pulsar.proxy.server.ProxyConnection.handleGetSchema(ProxyConnection.java:463) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:326) ~[io.streamnative-pulsar-common-2.9.2.12.jar:2.9.2.12]
        at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:221) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372) ~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1246) ~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1286) ~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) ~[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
```

### Modifications
- Improve the description
- Add a check to prevent wrong settings
  • Loading branch information
poorbarcode authored and Technoboy- committed Jan 31, 2024
1 parent 75e2142 commit 8a56873
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 12 deletions.
10 changes: 6 additions & 4 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ metadataStoreUrl=
# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl
configurationMetadataStoreUrl=

# If Service Discovery is Disabled this url should point to the discovery service provider.
# If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the discovery service
# provider, and does not support multi urls yet.
# The URL must begin with pulsar:// for plaintext or with pulsar+ssl:// for TLS.
brokerServiceURL=
brokerServiceURLTLS=

# These settings are unnecessary if `zookeeperServers` is specified
# If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the discovery service
# provider, and does not support multi urls yet.
brokerWebServiceURL=
brokerWebServiceURLTLS=

# If function workers are setup in a separate cluster, configure the following 2 settings
# to point to the function workers cluster
# If function workers are setup in a separate cluster, configure the following 2 settings. This url should point to
# the discovery service provider of the function workers cluster, and does not support multi urls yet.
functionWorkerWebServiceURL=
functionWorkerWebServiceURLTLS=

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,35 +173,43 @@ public class ProxyConfiguration implements PulsarConfiguration {

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The service url points to the broker cluster. URL must have the pulsar:// prefix."
doc = "If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the"
+ " discovery service provider."
+ " URL must have the pulsar:// prefix. And does not support multi url yet."
)
private String brokerServiceURL;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The tls service url points to the broker cluster. URL must have the pulsar+ssl:// prefix."
doc = "If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the"
+ " discovery service provider."
+ " URL must have the pulsar+ssl:// prefix. And does not support multi url yet."
)
private String brokerServiceURLTLS;

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The web service url points to the broker cluster"
doc = "The web service url points to the discovery service provider of the broker cluster, and does not support"
+ " multi url yet."
)
private String brokerWebServiceURL;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The tls web service url points to the broker cluster"
doc = "The tls web service url points to the discovery service provider of the broker cluster, and does not"
+ " support multi url yet."
)
private String brokerWebServiceURLTLS;

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The web service url points to the function worker cluster."
doc = "The web service url points to the discovery service provider of the function worker cluster, and does"
+ " not support multi url yet."
+ " Only configure it when you setup function workers in a separate cluster"
)
private String functionWorkerWebServiceURL;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The tls web service url points to the function worker cluster."
doc = "The tls web service url points to the discovery service provider of the function worker cluster, and"
+ " does not support multi url yet."
+ " Only configure it when you setup function workers in a separate cluster"
)
private String functionWorkerWebServiceURLTLS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,28 @@ public ProxyServiceStarter(String[] args) throws Exception {
if (isNotBlank(config.getBrokerServiceURL())) {
checkArgument(config.getBrokerServiceURL().startsWith("pulsar://"),
"brokerServiceURL must start with pulsar://");
ensureUrlNotContainsComma("brokerServiceURL", config.getBrokerServiceURL());
}

if (isNotBlank(config.getBrokerServiceURLTLS())) {
checkArgument(config.getBrokerServiceURLTLS().startsWith("pulsar+ssl://"),
"brokerServiceURLTLS must start with pulsar+ssl://");
ensureUrlNotContainsComma("brokerServiceURLTLS", config.getBrokerServiceURLTLS());
}

if (isNotBlank(config.getBrokerWebServiceURL())) {
ensureUrlNotContainsComma("brokerWebServiceURL", config.getBrokerWebServiceURL());
}
if (isNotBlank(config.getBrokerWebServiceURLTLS())) {
ensureUrlNotContainsComma("brokerWebServiceURLTLS", config.getBrokerWebServiceURLTLS());
}

if (isNotBlank(config.getFunctionWorkerWebServiceURL())) {
ensureUrlNotContainsComma("functionWorkerWebServiceURLTLS",
config.getFunctionWorkerWebServiceURL());
}
if (isNotBlank(config.getFunctionWorkerWebServiceURLTLS())) {
ensureUrlNotContainsComma("functionWorkerWebServiceURLTLS",
config.getFunctionWorkerWebServiceURLTLS());
}

if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
Expand All @@ -184,6 +201,11 @@ public ProxyServiceStarter(String[] args) throws Exception {
}
}

private void ensureUrlNotContainsComma(String paramName, String paramValue) {
checkArgument(!paramValue.contains(","), paramName + " does not support multi urls yet,"
+ " it should point to the discovery service provider.");
}

public static void main(String[] args) throws Exception {
ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@


import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import java.beans.Introspector;
Expand All @@ -36,6 +38,8 @@
import java.util.Properties;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Test(groups = "broker")
public class ProxyConfigurationTest {
Expand Down Expand Up @@ -134,4 +138,119 @@ public void testConvert() throws IOException {
}
}

@Test
public void testBrokerUrlCheck() throws IOException {
ProxyConfiguration configuration = new ProxyConfiguration();
// brokerServiceURL must start with pulsar://
configuration.setBrokerServiceURL("127.0.0.1:6650");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("brokerServiceURL must start with pulsar://");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("brokerServiceURL must start with pulsar://"));
}
}
configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650");

// brokerServiceURLTLS must start with pulsar+ssl://
configuration.setBrokerServiceURLTLS("pulsar://127.0.0.1:6650");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("brokerServiceURLTLS must start with pulsar+ssl://");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("brokerServiceURLTLS must start with pulsar+ssl://"));
}
}

// brokerServiceURL did not support multi urls yet.
configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650,pulsar://127.0.0.2:6650");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("brokerServiceURL does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
}
}
configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650");

// brokerServiceURLTLS did not support multi urls yet.
configuration.setBrokerServiceURLTLS("pulsar+ssl://127.0.0.1:6650,pulsar+ssl:127.0.0.2:6650");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("brokerServiceURLTLS does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
}
}
configuration.setBrokerServiceURLTLS("pulsar+ssl://127.0.0.1:6650");

// brokerWebServiceURL did not support multi urls yet.
configuration.setBrokerWebServiceURL("http://127.0.0.1:8080,http://127.0.0.2:8080");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("brokerWebServiceURL does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
}
}
configuration.setBrokerWebServiceURL("http://127.0.0.1:8080");

// brokerWebServiceURLTLS did not support multi urls yet.
configuration.setBrokerWebServiceURLTLS("https://127.0.0.1:443,https://127.0.0.2:443");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("brokerWebServiceURLTLS does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
}
}
configuration.setBrokerWebServiceURLTLS("https://127.0.0.1:443");

// functionWorkerWebServiceURL did not support multi urls yet.
configuration.setFunctionWorkerWebServiceURL("http://127.0.0.1:8080,http://127.0.0.2:8080");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("functionWorkerWebServiceURL does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
}
}
configuration.setFunctionWorkerWebServiceURL("http://127.0.0.1:8080");

// functionWorkerWebServiceURLTLS did not support multi urls yet.
configuration.setFunctionWorkerWebServiceURLTLS("http://127.0.0.1:443,http://127.0.0.2:443");
try (MockedStatic<PulsarConfigurationLoader> theMock = Mockito.mockStatic(PulsarConfigurationLoader.class)) {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
fail("functionWorkerWebServiceURLTLS does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
}
}
configuration.setFunctionWorkerWebServiceURLTLS("http://127.0.0.1:443");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {

static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
public static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};

protected ProxyServiceStarter serviceStarter;
protected String serviceUrl;
Expand Down

0 comments on commit 8a56873

Please sign in to comment.