diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java index 0cd5bce5d51cf..b1e608b620a18 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java @@ -48,6 +48,7 @@ protected void onCleanup() { } catch (Exception e) { log.error("Error in stopping ZK server", e); } + testZKServer = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 4a6524bf24521..941229fc3d96c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -126,15 +126,26 @@ private void createTenant(PulsarAdmin pulsarAdmin) @AfterClass(alwaysRun = true) public void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdownNow(); - executor = null; + if (executor != null) { + executor.shutdownNow(); + executor = null; + } for (int i = 0; i < BROKER_COUNT; i++) { - pulsarAdmins[i].close(); - pulsarServices[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; + } + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + pulsarServices[i] = null; + } } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index ded4ee8e58d53..358410f1f28e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -59,7 +59,10 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } log.info("---- bk stopped ----"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 04a2175b1d10f..f2a9e8643b230 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -153,13 +153,20 @@ void shutdown() throws Exception { executor.shutdownNow(); for (int i = 0; i < BROKER_COUNT; i++) { - pulsarAdmins[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; + } if (pulsarServices[i] != null) { pulsarServices[i].close(); + pulsarServices[i] = null; } } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private void loopUntilLeaderChangesForAllBroker(List activePulsars, LeaderBroker oldLeader) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index f92faa3e4bd54..f6ee4ea95b601 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -163,15 +163,33 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdownNow(); + if (executor != null) { + executor.shutdownNow(); + executor = null; + } - admin1.close(); - admin2.close(); + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } - pulsar2.close(); - pulsar1.close(); + if (pulsar2 != null) { + pulsar2.close(); + pulsar2 = null; + } + if (pulsar1 != null) { + pulsar1.close(); + pulsar1 = null; + } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private void createNamespacePolicies(PulsarService pulsar) throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index e199e695cf777..4945da7a4f063 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -193,8 +193,14 @@ private BrokerRegistryImpl createBrokerRegistryImpl(PulsarService pulsar) { @AfterClass(alwaysRun = true) void shutdown() throws Exception { - executor.shutdownNow(); - bkEnsemble.stop(); + if (executor != null) { + executor.shutdownNow(); + executor = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index c74820e69a1a3..1d0b2f4216fc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -99,8 +99,14 @@ protected void setup() throws Exception { @Override @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { - this.additionalPulsarTestContext.close(); + if (additionalPulsarTestContext != null) { + additionalPulsarTestContext.close(); + additionalPulsarTestContext = null; + } super.internalCleanup(); + pulsar1 = pulsar2 = null; + primaryLoadManager = secondaryLoadManager = null; + channel1 = channel2 = null; } @BeforeMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java index 3173987a3c8a8..bc49352f41d21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -150,8 +150,14 @@ public void testLoadBalancerNamespaceMaximumBundles() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - pulsar.close(); - bkEnsemble.stop(); + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index 8609d5889af67..75ca0a1bf55cb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -224,19 +224,36 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdownNow(); - - admin1.close(); - admin2.close(); + if (executor != null) { + executor.shutdownNow(); + executor = null; + } - pulsar2.close(); - pulsar1.close(); + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } - if (pulsar3.isRunning()) { + if (pulsar2 != null) { + pulsar2.close(); + pulsar2 = null; + } + if (pulsar1 != null) { + pulsar1.close(); + pulsar1 = null; + } + if (pulsar3 != null && pulsar3.isRunning()) { pulsar3.close(); } - - bkEnsemble.stop(); + pulsar3 = null; + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java index 19e40ebf9960f..a60d6599e8f76 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java @@ -66,8 +66,14 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void shutdown() throws Exception { - pulsar.close(); - bkEnsemble.stop(); + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index 9b3d145dea230..2402bbe936751 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -116,9 +116,18 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { config = null; markCurrentSetupNumberCleaned(); - admin.close(); - pulsar.close(); - bkEnsemble.stop(); + if (admin != null) { + admin.close(); + admin = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 3b2f3cf215ea3..f08edc36898bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -105,8 +105,12 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { if (pulsarService != null) { pulsarService.close(); + pulsarService = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; } - bkEnsemble.stop(); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index bc6df685ffcd7..a1cb4abc4c30b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -43,7 +43,6 @@ @Slf4j public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetrySupport { - protected final String defaultTenant = "public"; protected final String defaultNamespace = defaultTenant + "/default"; protected int numberOfBookies = 3; @@ -60,6 +59,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr protected ZooKeeper localZkOfBroker; protected Object localMetaDataStoreClientCnx; protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = new AtomicBoolean(); + protected void startZKAndBK() throws Exception { // Start ZK. brokerConfigZk = new ZookeeperServerTest(0); @@ -198,18 +198,30 @@ protected void cleanup() throws Exception { stopLocalMetadataStoreAlwaysReconnect(); // Stop brokers. - client.close(); - admin.close(); + if (client != null) { + client.close(); + client = null; + } + if (admin != null) { + admin.close(); + admin = null; + } if (pulsar != null) { pulsar.close(); + pulsar = null; } // Stop ZK and BK. - bkEnsemble.stop(); - brokerConfigZk.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } + if (brokerConfigZk != null) { + brokerConfigZk.stop(); + brokerConfigZk = null; + } // Reset configs. config = new ServiceConfiguration(); - setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index fb15661fddf2d..84543a82d7725 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -91,8 +91,18 @@ void setup() { @AfterMethod(alwaysRun = true) void shutdown() { try { - pulsar.close(); - bkEnsemble.stop(); + if (admin != null) { + admin.close(); + admin = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } catch (Throwable t) { t.printStackTrace(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 33620716288af..795983f7482e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -193,27 +193,51 @@ protected void cleanup() throws Exception { log.info("--- Shutting down ---"); // Stop brokers. - client1.close(); - client2.close(); - admin1.close(); - admin2.close(); + if (client1 != null) { + client1.close(); + client1 = null; + } + if (client2 != null) { + client2.close(); + client2 = null; + } + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } if (pulsar2 != null) { pulsar2.close(); + pulsar2 = null; } if (pulsar1 != null) { pulsar1.close(); + pulsar1 = null; } // Stop ZK and BK. - bkEnsemble1.stop(); - bkEnsemble2.stop(); - brokerConfigZk1.stop(); - brokerConfigZk2.stop(); + if (bkEnsemble1 != null) { + bkEnsemble1.stop(); + bkEnsemble1 = null; + } + if (bkEnsemble2 != null) { + bkEnsemble2.stop(); + bkEnsemble2 = null; + } + if (brokerConfigZk1 != null) { + brokerConfigZk1.stop(); + brokerConfigZk1 = null; + } + if (brokerConfigZk2 != null) { + brokerConfigZk2.stop(); + brokerConfigZk2 = null; + } // Reset configs. config1 = new ServiceConfiguration(); - setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1); config2 = new ServiceConfiguration(); - setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 3fa3a8dfc0f15..bd0496e07188a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -130,9 +130,11 @@ public class ReplicatorTest extends ReplicatorTestBase { @BeforeMethod(alwaysRun = true) public void beforeMethod(Method m) throws Exception { methodName = m.getName(); - admin1.namespaces().removeBacklogQuota("pulsar/ns"); - admin1.namespaces().removeBacklogQuota("pulsar/ns1"); - admin1.namespaces().removeBacklogQuota("pulsar/global/ns"); + if (admin1 != null) { + admin1.namespaces().removeBacklogQuota("pulsar/ns"); + admin1.namespaces().removeBacklogQuota("pulsar/ns1"); + admin1.namespaces().removeBacklogQuota("pulsar/global/ns"); + } } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index beb1a3c4b9309..97a2f633d81a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -348,22 +348,18 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, public void resetConfig1() { config1 = new ServiceConfiguration(); - setConfig1DefaultValue(); } public void resetConfig2() { config2 = new ServiceConfiguration(); - setConfig2DefaultValue(); } public void resetConfig3() { config3 = new ServiceConfiguration(); - setConfig3DefaultValue(); } public void resetConfig4() { config4 = new ServiceConfiguration(); - setConfig4DefaultValue(); } private int inSec(int time, TimeUnit unit) { @@ -379,29 +375,60 @@ protected void cleanup() throws Exception { executor = null; } - admin1.close(); - admin2.close(); - admin3.close(); - admin4.close(); + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } + if (admin3 != null) { + admin3.close(); + admin3 = null; + } + if (admin4 != null) { + admin4.close(); + admin4 = null; + } if (pulsar4 != null) { pulsar4.close(); + pulsar4 = null; } if (pulsar3 != null) { pulsar3.close(); + pulsar3 = null; } if (pulsar2 != null) { pulsar2.close(); + pulsar2 = null; } if (pulsar1 != null) { pulsar1.close(); + pulsar1 = null; } - bkEnsemble1.stop(); - bkEnsemble2.stop(); - bkEnsemble3.stop(); - bkEnsemble4.stop(); - globalZkS.stop(); + if (bkEnsemble1 != null) { + bkEnsemble1.stop(); + bkEnsemble1 = null; + } + if (bkEnsemble2 != null) { + bkEnsemble2.stop(); + bkEnsemble2 = null; + } + if (bkEnsemble3 != null) { + bkEnsemble3.stop(); + bkEnsemble3 = null; + } + if (bkEnsemble4 != null) { + bkEnsemble4.stop(); + bkEnsemble4 = null; + } + if (globalZkS != null) { + globalZkS.stop(); + globalZkS = null; + } resetConfig1(); resetConfig2(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index 5a8fd34c9cdba..521d68cebe599 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -117,10 +117,19 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void tearDown() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { - pulsarServices[i].close(); - pulsarAdmins[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; + } + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + pulsarServices[i] = null; + } + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; } - bkEnsemble.stop(); } @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index 7a0fb48f91150..5bf48932f3687 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -122,22 +122,27 @@ public final void shutdownAll() throws Exception { protected void cleanup() throws Exception { if (transactionCoordinatorClient != null) { transactionCoordinatorClient.close(); + transactionCoordinatorClient = null; } - for (PulsarAdmin admin : pulsarAdmins) { - if (admin != null) { - admin.close(); + for (int i = 0; i < BROKER_COUNT; i++) { + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + pulsarAdmins[i] = null; } } if (pulsarClient != null) { pulsarClient.close(); + pulsarClient = null; } - for (PulsarService service : pulsarServices) { - if (service != null) { - service.close(); + for (int i = 0; i < BROKER_COUNT; i++) { + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + pulsarServices[i] = null; } } if (bkEnsemble != null) { bkEnsemble.stop(); + bkEnsemble = null; } Mockito.reset(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index 6b3b05405baea..601a8d76aaacd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -127,10 +127,22 @@ void setup(Method method) throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - pulsarClient.close(); - admin.close(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsar = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private static class ProducerThread implements Runnable { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index cbf2f28b0b50b..e9b3531c7c2e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -216,11 +216,26 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { void shutdown() throws Exception { try { log.info("--- Shutting down ---"); - pulsarClient.close(); - superUserAdmin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (superUserAdmin != null) { + superUserAdmin.close(); + superUserAdmin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 72e6a3766aeee..68afa83e88b2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -218,11 +218,26 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { void shutdown() throws Exception { try { log.info("--- Shutting down ---"); - pulsarClient.close(); - admin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 3508cf0bfc7e6..3be16357d332b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -202,11 +202,13 @@ void tearDown() throws Exception { for (int i = 0; i < BROKER_COUNT; i++) { if (pulsarAdmins[i] != null) { pulsarAdmins[i].close(); + pulsarAdmins[i] = null; } } for (int i = 0; i < BROKER_COUNT; i++) { if (fnWorkerServices[i] != null) { fnWorkerServices[i].stop(); + fnWorkerServices[i] = null; } } for (int i = 0; i < BROKER_COUNT; i++) { @@ -221,9 +223,13 @@ void tearDown() throws Exception { getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); pulsarServices[i].getConfiguration() .getWebServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarServices[i] = null; } } - bkEnsemble.stop(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { for (int i = 0; i < BROKER_COUNT; i++) { if (tempDirectories[i] != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 6226fa904885c..9c137e37095ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -131,11 +131,26 @@ void setup(Method method) throws Exception { void shutdown() { log.info("--- Shutting down ---"); try { - pulsarClient.close(); - admin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } catch (Exception e) { log.warn("Encountered errors at shutting down PulsarWorkerAssignmentTest", e); } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index 3c0dd0822b7dc..d27e27639048e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -239,29 +239,35 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { void shutdown() throws Exception { log.info("--- Shutting down ---"); try { - if (fileServer != null) { - fileServer.stop(); - } + if (fileServer != null) { + fileServer.stop(); + fileServer = null; + } - if (pulsarClient != null) { - pulsarClient.close(); - } + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } - if (admin != null) { - admin.close(); - } + if (admin != null) { + admin.close(); + admin = null; + } - if (functionsWorkerService != null) { - functionsWorkerService.stop(); - } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } - if (pulsar != null) { - pulsar.close(); - } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } - if (bkEnsemble != null) { - bkEnsemble.stop(); - } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index 22b9ad0df3a69..aafd82d339a1d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -172,11 +172,26 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - pulsarClient.close(); - admin.close(); - functionsWorkerService.stop(); - pulsar.close(); - bkEnsemble.stop(); + if (pulsarClient != null) { + pulsarClient.close(); + pulsarClient = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 810ac69ac3eb3..da479321b8bc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -180,10 +180,22 @@ void setup(Method method) throws Exception { void shutdown() throws Exception { log.info("--- Shutting down ---"); try { - functionAdmin.close(); - functionsWorkerService.stop(); - workerServer.stop(); - bkEnsemble.stop(); + if (functionAdmin != null) { + functionAdmin.close(); + functionAdmin = null; + } + if (functionsWorkerService != null) { + functionsWorkerService.stop(); + functionsWorkerService = null; + } + if (workerServer != null) { + workerServer.stop(); + workerServer = null; + } + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java index 013bc23d6630f..355d2a0b1dbe1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java @@ -68,15 +68,20 @@ public void start() throws IOException { } public void stop() throws IOException { - zks.shutdown(); - serverFactory.shutdown(); - log.info("Stoppend ZK server at {}", hostPort); + if (zks != null) { + zks.shutdown(); + zks = null; + } + if (serverFactory != null) { + serverFactory.shutdown(); + serverFactory = null; + } + log.info("Stopped ZK server at {}", hostPort); } @Override public void close() throws IOException { - zks.shutdown(); - serverFactory.shutdown(); + stop(); zkTmpDir.delete(); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java index 33034ddb3fe0f..0d01d9c56abc8 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java @@ -131,6 +131,7 @@ public void checkContainers() throws Exception { public void stop() throws Exception { if (zooKeeperServerEmbedded != null) { zooKeeperServerEmbedded.close(); + zooKeeperServerEmbedded = null; } log.info("Stopped test ZK server"); }