diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java index e5b80c0af33ab5..a78254df4aae01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java @@ -59,6 +59,7 @@ protected void onCleanup() { } catch (Exception e) { log.error("Error in stopping ZK server", e); } + testZKServer = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 4a6524bf245216..941229fc3d96cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -126,15 +126,26 @@ private void createTenant(PulsarAdmin pulsarAdmin) @AfterClass(alwaysRun = true) public void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdownNow(); - executor = null; + if (executor != null) { + executor.shutdownNow(); + executor = null; + } for (int i = 0; i < BROKER_COUNT; i++) { - pulsarAdmins[i].close(); - pulsarServices[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; + } + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + pulsarServices[i] = null; + } } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index ded4ee8e58d537..358410f1f28e3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -59,7 +59,10 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } log.info("---- bk stopped ----"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 7a2314b01a3d10..95aafd84ae406b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -147,11 +147,20 @@ void shutdown() throws Exception { log.info("--- Shutting down ---"); for (int i = 0; i < BROKER_COUNT; i++) { - pulsarAdmins[i].close(); - pulsarServices[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; + } + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + pulsarServices[i] = null; + } } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private void loopUntilLeaderChangesForAllBroker(List activePulsars, LeaderBroker oldLeader) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index f6154e3ec8e304..8f7aa17d0d7bf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -164,15 +164,33 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdownNow(); + if (executor != null) { + executor.shutdownNow(); + executor = null; + } - admin1.close(); - admin2.close(); + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } - pulsar2.close(); - pulsar1.close(); + if (pulsar2 != null) { + pulsar2.close(); + pulsar2 = null; + } + if (pulsar1 != null) { + pulsar1.close(); + pulsar1 = null; + } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private void createNamespacePolicies(PulsarService pulsar) throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index fdd1eb7272c307..42600a42035516 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -192,8 +192,14 @@ private BrokerRegistryImpl createBrokerRegistryImpl(PulsarService pulsar) { @AfterClass(alwaysRun = true) void shutdown() throws Exception { - executor.shutdownNow(); - bkEnsemble.stop(); + if (executor != null) { + executor.shutdownNow(); + executor = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java index 3173987a3c8a88..bc49352f41d218 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -150,8 +150,14 @@ public void testLoadBalancerNamespaceMaximumBundles() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - pulsar.close(); - bkEnsemble.stop(); + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index 824291c52da77c..1f9cd806e19b5a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -228,19 +228,36 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdownNow(); - - admin1.close(); - admin2.close(); + if (executor != null) { + executor.shutdownNow(); + executor = null; + } - pulsar2.close(); - pulsar1.close(); + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } - if (pulsar3.isRunning()) { + if (pulsar2 != null) { + pulsar2.close(); + pulsar2 = null; + } + if (pulsar1 != null) { + pulsar1.close(); + pulsar1 = null; + } + if (pulsar3 != null && pulsar3.isRunning()) { pulsar3.close(); } - - bkEnsemble.stop(); + pulsar3 = null; + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index 19e40ebf9960fa..a60d6599e8f76a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -66,8 +66,14 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void shutdown() throws Exception { - pulsar.close(); - bkEnsemble.stop(); + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index 3d9ba658f770e4..71c5a995643c61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -119,9 +119,18 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { config = null; markCurrentSetupNumberCleaned(); - admin.close(); - pulsar.close(); - bkEnsemble.stop(); + if (admin != null) { + admin.close(); + admin = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 19aa3ae0bd1c97..d7272fcffa9647 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -105,8 +105,12 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { if (pulsarService != null) { pulsarService.close(); + pulsarService = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; } - bkEnsemble.stop(); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index bc6df685ffcd78..21b0408ce078aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -43,7 +43,6 @@ @Slf4j public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetrySupport { - protected final String defaultTenant = "public"; protected final String defaultNamespace = defaultTenant + "/default"; protected int numberOfBookies = 3; @@ -60,6 +59,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr protected ZooKeeper localZkOfBroker; protected Object localMetaDataStoreClientCnx; protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = new AtomicBoolean(); + protected void startZKAndBK() throws Exception { // Start ZK. brokerConfigZk = new ZookeeperServerTest(0); @@ -198,15 +198,28 @@ protected void cleanup() throws Exception { stopLocalMetadataStoreAlwaysReconnect(); // Stop brokers. - client.close(); - admin.close(); + if (client != null) { + client.close(); + client = null; + } + if (admin != null) { + admin.close(); + admin = null; + } if (pulsar != null) { pulsar.close(); + pulsar = null; } // Stop ZK and BK. - bkEnsemble.stop(); - brokerConfigZk.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } + if (brokerConfigZk != null) { + brokerConfigZk.stop(); + brokerConfigZk = null; + } // Reset configs. config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index 780d33de521b3e..84543a82d77259 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -91,9 +91,18 @@ void setup() { @AfterMethod(alwaysRun = true) void shutdown() { try { - pulsar.close(); - bkEnsemble.stop(); - admin.close(); + if (admin != null) { + admin.close(); + admin = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } catch (Throwable t) { t.printStackTrace(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 95f976f965a0d8..88cd076f4a52a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -240,22 +240,48 @@ protected void cleanup() throws Exception { log.info("--- Shutting down ---"); // Stop brokers. - client1.close(); - client2.close(); - admin1.close(); - admin2.close(); + if (client1 != null) { + client1.close(); + client1 = null; + } + if (client2 != null) { + client2.close(); + client2 = null; + } + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } if (pulsar2 != null) { pulsar2.close(); + pulsar2 = null; } if (pulsar1 != null) { pulsar1.close(); + pulsar1 = null; } // Stop ZK and BK. - bkEnsemble1.stop(); - bkEnsemble2.stop(); - brokerConfigZk1.stop(); - brokerConfigZk2.stop(); + if (bkEnsemble1 != null) { + bkEnsemble1.stop(); + bkEnsemble1 = null; + } + if (bkEnsemble2 != null) { + bkEnsemble2.stop(); + bkEnsemble2 = null; + } + if (brokerConfigZk1 != null) { + brokerConfigZk1.stop(); + brokerConfigZk1 = null; + } + if (brokerConfigZk2 != null) { + brokerConfigZk2.stop(); + brokerConfigZk2 = null; + } // Reset configs. config1 = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index ba9f850ff0cc1b..f507caa100dea2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -386,29 +386,60 @@ protected void cleanup() throws Exception { executor = null; } - admin1.close(); - admin2.close(); - admin3.close(); - admin4.close(); + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } + if (admin3 != null) { + admin3.close(); + admin3 = null; + } + if (admin4 != null) { + admin4.close(); + admin4 = null; + } if (pulsar4 != null) { pulsar4.close(); + pulsar4 = null; } if (pulsar3 != null) { pulsar3.close(); + pulsar3 = null; } if (pulsar2 != null) { pulsar2.close(); + pulsar2 = null; } if (pulsar1 != null) { pulsar1.close(); + pulsar1 = null; } - bkEnsemble1.stop(); - bkEnsemble2.stop(); - bkEnsemble3.stop(); - bkEnsemble4.stop(); - globalZkS.stop(); + if (bkEnsemble1 != null) { + bkEnsemble1.stop(); + bkEnsemble1 = null; + } + if (bkEnsemble2 != null) { + bkEnsemble2.stop(); + bkEnsemble2 = null; + } + if (bkEnsemble3 != null) { + bkEnsemble3.stop(); + bkEnsemble3 = null; + } + if (bkEnsemble4 != null) { + bkEnsemble4.stop(); + bkEnsemble4 = null; + } + if (globalZkS != null) { + globalZkS.stop(); + globalZkS = null; + } resetConfig1(); resetConfig2(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index 5a8fd34c9cdbab..521d68cebe599b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -117,10 +117,19 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void tearDown() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { - pulsarServices[i].close(); - pulsarAdmins[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; + } + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + pulsarServices[i] = null; + } + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; } - bkEnsemble.stop(); } @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index 7a0fb48f91150e..5bf48932f3687e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -122,22 +122,27 @@ public final void shutdownAll() throws Exception { protected void cleanup() throws Exception { if (transactionCoordinatorClient != null) { transactionCoordinatorClient.close(); + transactionCoordinatorClient = null; } - for (PulsarAdmin admin : pulsarAdmins) { - if (admin != null) { - admin.close(); + for (int i = 0; i < BROKER_COUNT; i++) { + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; } } if (pulsarClient != null) { pulsarClient.close(); + pulsarClient = null; } - for (PulsarService service : pulsarServices) { - if (service != null) { - service.close(); + for (int i = 0; i < BROKER_COUNT; i++) { + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + pulsarServices[i] = null; } } if (bkEnsemble != null) { bkEnsemble.stop(); + bkEnsemble = null; } Mockito.reset(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index 6b3b05405baea2..601a8d76aaacdd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -127,10 +127,22 @@ void setup(Method method) throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - pulsarClient.close(); - admin.close(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsar = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private static class ProducerThread implements Runnable { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index cbf2f28b0b50b8..e9b3531c7c2e2d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -216,11 +216,26 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { void shutdown() throws Exception { try { log.info("--- Shutting down ---"); - pulsarClient.close(); - superUserAdmin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (superUserAdmin != null) { + superUserAdmin.close(); + superUserAdmin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 569c2d36ff3a78..50dc39a3a79d28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -219,11 +219,26 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { void shutdown() throws Exception { try { log.info("--- Shutting down ---"); - pulsarClient.close(); - admin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 3508cf0bfc7e62..3be16357d332b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -202,11 +202,13 @@ void tearDown() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { if (pulsarAdmins[i] != null) { pulsarAdmins[i].close(); + pulsarAdmins[i] = null; } } for (int i = 0; i < BROKER_COUNT; i++) { if (fnWorkerServices[i] != null) { fnWorkerServices[i].stop(); + fnWorkerServices[i] = null; } } for (int i = 0; i < BROKER_COUNT; i++) { @@ -221,9 +223,13 @@ void tearDown() throws Exception { getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); pulsarServices[i].getConfiguration() .getWebServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarServices[i] = null; } } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { for (int i = 0; i < BROKER_COUNT; i++) { if (tempDirectories[i] != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 6226fa904885c3..9c137e37095edd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -131,11 +131,26 @@ void setup(Method method) throws Exception { void shutdown() { log.info("--- Shutting down ---"); try { - pulsarClient.close(); - admin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } catch (Exception e) { log.warn("Encountered errors at shutting down PulsarWorkerAssignmentTest", e); } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index 3c0dd0822b7dca..d27e27639048e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -239,29 +239,35 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { void shutdown() throws Exception { log.info("--- Shutting down ---"); try { - if (fileServer != null) { - fileServer.stop(); - } + if (fileServer != null) { + fileServer.stop(); + fileServer = null; + } - if (pulsarClient != null) { - pulsarClient.close(); - } + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } - if (admin != null) { - admin.close(); - } + if (admin != null) { + admin.close(); + admin = null; + } - if (functionsWorkerService != null) { - functionsWorkerService.stop(); - } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } - if (pulsar != null) { - pulsar.close(); - } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } - if (bkEnsemble != null) { - bkEnsemble.stop(); - } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index 22b9ad0df3a694..aafd82d339a1d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -172,11 +172,26 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - pulsarClient.close(); - admin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 810ac69ac3eb3d..da479321b8bc27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -180,10 +180,22 @@ void setup(Method method) throws Exception { void shutdown() throws Exception { log.info("--- Shutting down ---"); try { - functionAdmin.close(); - functionsWorkerService.stop(); - workerServer.stop(); - bkEnsemble.stop(); + if (functionAdmin != null) { + functionAdmin.close(); + functionAdmin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (workerServer != null) { + workerServer.stop(); + workerServer = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete();