Skip to content

Commit

Permalink
[improve][java-client] Support passing existing scheduled executor pr…
Browse files Browse the repository at this point in the history
…oviders to the client (apache#16334)

* [improve][java-client] Support passing existing scheduled executor providers to the client

### Motivation

apache#16236 introduced a new scheduled executor but does not support passing the existing scheduled executor providers like apache#12037.

* Apply comment.
  • Loading branch information
codelipenghui authored Jul 1, 2022
1 parent d9f5640 commit f159f62
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public class PulsarClientImpl implements PulsarClient {

protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;

private final boolean createdScheduledProviders;
private LookupService lookup;
private final ConnectionPool cnxPool;
@Getter
Expand Down Expand Up @@ -149,28 +151,29 @@ public SchemaInfoProvider load(String topicName) {
private TransactionCoordinatorClientImpl tcClient;

public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, null, null, null, null, null);
this(conf, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, null, null, null, null);
this(conf, eventLoopGroup, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, null, null, null);
this(conf, eventLoopGroup, cnxPool, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, timer, null, null);
this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
}

@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider) throws PulsarClientException {
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
try {
Expand All @@ -181,9 +184,10 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
"Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
}
this.createdExecutorProviders = externalExecutorProvider == null;
this.createdScheduledProviders = scheduledExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
if (conf == null || isBlank(conf.getServiceUrl())) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
setAuth(conf);
Expand All @@ -197,8 +201,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-scheduled");
this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
Expand Down Expand Up @@ -868,8 +872,8 @@ private void shutdownEventLoopGroup(EventLoopGroup eventLoopGroup) throws Pulsar
}

private void shutdownExecutors() throws PulsarClientException {
PulsarClientException pulsarClientException = null;
if (createdExecutorProviders) {
PulsarClientException pulsarClientException = null;

if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) {
try {
Expand All @@ -887,11 +891,18 @@ private void shutdownExecutors() throws PulsarClientException {
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (pulsarClientException != null) {
throw pulsarClientException;
}
if (createdScheduledProviders && scheduledExecutorProvider != null && !scheduledExecutorProvider.isShutdown()) {
try {
externalExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown scheduledExecutorProvider", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}
if (pulsarClientException != null) {
throw pulsarClientException;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -230,16 +232,24 @@ public void testInitializingWithExecutorProviders() throws PulsarClientException
ClientConfigurationData conf = clientImpl.conf;
@Cleanup("shutdownNow")
ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
@Cleanup("shutdownNow")
ScheduledExecutorProvider scheduledExecutorProvider =
new ScheduledExecutorProvider(2, "scheduled-executor");
@Cleanup
PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
.internalExecutorProvider(executorProvider)
.externalExecutorProvider(executorProvider)
.scheduledExecutorProvider(scheduledExecutorProvider)
.build();
@Cleanup
PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf)
.internalExecutorProvider(executorProvider)
.externalExecutorProvider(executorProvider)
.scheduledExecutorProvider(scheduledExecutorProvider)
.build();

assertEquals(client2.getScheduledExecutorProvider(), scheduledExecutorProvider);
assertEquals(client3.getScheduledExecutorProvider(), scheduledExecutorProvider);
}

@Test(expectedExceptions = IllegalArgumentException.class,
Expand Down

0 comments on commit f159f62

Please sign in to comment.