From f159f62e8017a4286775cfe65632b1415bda9c4b Mon Sep 17 00:00:00 2001 From: lipenghui Date: Sat, 2 Jul 2022 06:29:16 +0800 Subject: [PATCH] [improve][java-client] Support passing existing scheduled executor providers to the client (#16334) * [improve][java-client] Support passing existing scheduled executor providers to the client ### Motivation #16236 introduced a new scheduled executor but does not support passing the existing scheduled executor providers like #12037. * Apply comment. --- .../pulsar/client/impl/PulsarClientImpl.java | 35 ++++++++++++------- .../client/impl/PulsarClientImplTest.java | 10 ++++++ 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 2c6339cdb2028..a5ee6596834dd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -102,6 +102,8 @@ public class PulsarClientImpl implements PulsarClient { protected final ClientConfigurationData conf; private final boolean createdExecutorProviders; + + private final boolean createdScheduledProviders; private LookupService lookup; private final ConnectionPool cnxPool; @Getter @@ -149,28 +151,29 @@ public SchemaInfoProvider load(String topicName) { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, null, null, null, null, null); + this(conf, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, null, null, null, null); + this(conf, eventLoopGroup, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, null, null); + this(conf, eventLoopGroup, cnxPool, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, null, null); + this(conf, eventLoopGroup, cnxPool, timer, null, null, null); } @Builder(builderClassName = "PulsarClientImplBuilder") private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, ExecutorProvider externalExecutorProvider, - ExecutorProvider internalExecutorProvider) throws PulsarClientException { + ExecutorProvider internalExecutorProvider, + ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; try { @@ -181,9 +184,10 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG "Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified."); } this.createdExecutorProviders = externalExecutorProvider == null; + this.createdScheduledProviders = scheduledExecutorProvider == null; eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf); this.eventLoopGroup = eventLoopGroupReference; - if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) { + if (conf == null || isBlank(conf.getServiceUrl())) { throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration"); } setAuth(conf); @@ -197,8 +201,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); - this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(), - "pulsar-client-scheduled"); + this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : + new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, this.eventLoopGroup); } else { @@ -868,8 +872,8 @@ private void shutdownEventLoopGroup(EventLoopGroup eventLoopGroup) throws Pulsar } private void shutdownExecutors() throws PulsarClientException { + PulsarClientException pulsarClientException = null; if (createdExecutorProviders) { - PulsarClientException pulsarClientException = null; if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) { try { @@ -887,11 +891,18 @@ private void shutdownExecutors() throws PulsarClientException { pulsarClientException = PulsarClientException.unwrap(t); } } - - if (pulsarClientException != null) { - throw pulsarClientException; + } + if (createdScheduledProviders && scheduledExecutorProvider != null && !scheduledExecutorProvider.isShutdown()) { + try { + externalExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown scheduledExecutorProvider", t); + pulsarClientException = PulsarClientException.unwrap(t); } } + if (pulsarClientException != null) { + throw pulsarClientException; + } } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 4d9977f547803..93fe3e34876ec 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertSame; @@ -55,6 +56,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.naming.NamespaceName; @@ -230,16 +232,24 @@ public void testInitializingWithExecutorProviders() throws PulsarClientException ClientConfigurationData conf = clientImpl.conf; @Cleanup("shutdownNow") ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor"); + @Cleanup("shutdownNow") + ScheduledExecutorProvider scheduledExecutorProvider = + new ScheduledExecutorProvider(2, "scheduled-executor"); @Cleanup PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf) .internalExecutorProvider(executorProvider) .externalExecutorProvider(executorProvider) + .scheduledExecutorProvider(scheduledExecutorProvider) .build(); @Cleanup PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf) .internalExecutorProvider(executorProvider) .externalExecutorProvider(executorProvider) + .scheduledExecutorProvider(scheduledExecutorProvider) .build(); + + assertEquals(client2.getScheduledExecutorProvider(), scheduledExecutorProvider); + assertEquals(client3.getScheduledExecutorProvider(), scheduledExecutorProvider); } @Test(expectedExceptions = IllegalArgumentException.class,