diff --git a/conf/broker.conf b/conf/broker.conf index 2f25984e5f203..523f7ad704c3c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -386,6 +386,12 @@ retentionCheckIntervalInSeconds=120 # Use 0 or negative number to disable the check maxNumPartitionsPerPartitionedTopic=0 +# There are two policies when zookeeper session expired happens, "shutdown" and "reconnect". +# If uses "shutdown" policy, shutdown the broker when zookeeper session expired happens. +# If uses "reconnect" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper. +# Node: the "reconnect" policy is an experiment feature +zookeeperSessionExpiredPolicy=shutdown + # Enable or disable system topic systemTopicEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3b58ff7a61529..b91fe0a12f9af 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -707,6 +707,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int maxNumPartitionsPerPartitionedTopic = 0; + @FieldContext( + doc = "There are two policies when zookeeper session expired happens, \"shutdown\" and \"reconnect\". \n\n" + + " If uses \"shutdown\" policy, shutdown the broker when zookeeper session expired happens.\n\n" + + " If uses \"reconnect\" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper." + ) + private String zookeeperSessionExpiredPolicy = "shutdown"; + /**** --- Messaging Protocols --- ****/ @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java new file mode 100644 index 0000000000000..e6541939d7dec --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar; + +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher; +import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; +import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; + +/** + * Handlers for broker service to handle Zookeeper session expired + */ +public class ZookeeperSessionExpiredHandlers { + + public static final String SHUTDOWN_POLICY = "shutdown"; + public static final String RECONNECT_POLICY = "reconnect"; + + public static ZookeeperSessionExpiredHandler shutdownWhenZookeeperSessionExpired(ShutdownService shutdownService) { + return new ShutDownWhenSessionExpired(shutdownService); + } + + public static ZookeeperSessionExpiredHandler reconnectWhenZookeeperSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) { + return new ReconnectWhenSessionExpired(pulsarService, shutdownService); + } + + // Shutdown the messaging service when Zookeeper session expired. + public static class ShutDownWhenSessionExpired implements ZookeeperSessionExpiredHandler { + + private final ShutdownService shutdownService; + private ZooKeeperSessionWatcher watcher; + + public ShutDownWhenSessionExpired(ShutdownService shutdownService) { + this.shutdownService = shutdownService; + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + + @Override + public void onSessionExpired() { + this.watcher.close(); + this.shutdownService.shutdown(-1); + } + } + + // Reconnect to the zookeeper server and re-register ownership cache to avoid ownership change. + public static class ReconnectWhenSessionExpired implements ZookeeperSessionExpiredHandler { + + private final PulsarService pulsarService; + private ZooKeeperSessionWatcher watcher; + private final ShutdownService shutdownService; + + public ReconnectWhenSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) { + this.pulsarService = pulsarService; + this.shutdownService = shutdownService; + } + + @Override + public void onSessionExpired() { + if (this.pulsarService.getNamespaceService() == null) { + this.watcher.close(); + this.shutdownService.shutdown(-1); + } + this.pulsarService.getNamespaceService().registerOwnedBundles(); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 689b0a473600a..b2d5b7abf400f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -132,6 +132,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl; import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; +import org.apache.pulsar.ZookeeperSessionExpiredHandlers; +import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -417,7 +419,15 @@ public void start() throws PulsarServerException { // Now we are ready to start services localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis()); - localZooKeeperConnectionProvider.start(shutdownService); + ZookeeperSessionExpiredHandler sessionExpiredHandler = null; + if (ZookeeperSessionExpiredHandlers.RECONNECT_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) { + sessionExpiredHandler = ZookeeperSessionExpiredHandlers.reconnectWhenZookeeperSessionExpired(this, shutdownService); + } else if (ZookeeperSessionExpiredHandlers.SHUTDOWN_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) { + sessionExpiredHandler = ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(shutdownService); + } else { + throw new IllegalArgumentException("Invalid zookeeper session expired policy " + config.getZookeeperSessionExpiredPolicy()); + } + localZooKeeperConnectionProvider.start(sessionExpiredHandler); // Initialize and start service to access configuration repository. this.startZkCacheService(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 943761e4bfc52..e917db7948232 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -74,6 +74,7 @@ import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1267,4 +1268,27 @@ public boolean registerSLANamespace() throws PulsarServerException { } return isNameSpaceRegistered; } + + public void registerOwnedBundles() { + List ownedBundles = new ArrayList<>(ownershipCache.getOwnedBundles().values()); + ownershipCache.invalidateLocalOwnerCache(); + ownedBundles.forEach(ownedBundle -> { + String path = ServiceUnitZkUtils.path(ownedBundle.getNamespaceBundle()); + try { + if (!pulsar.getLocalZkCache().checkRegNodeAndWaitExpired(path)) { + ownershipCache.tryAcquiringOwnership(ownedBundle.getNamespaceBundle()); + } + } catch (Exception e) { + try { + ownedBundle.handleUnloadRequest(pulsar, 5, TimeUnit.MINUTES); + } catch (IllegalStateException ex) { + // The owned bundle is not in active state. + } catch (Exception ex) { + LOG.error("Unexpected exception occur when register owned bundle {}. Shutdown broker now !!!", + ownedBundle.getNamespaceBundle(), ex); + pulsar.getShutdownService().shutdown(-1); + } + } + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 7acd5f82299b4..3d09b969004b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -372,6 +372,10 @@ public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws E } } + public void invalidateLocalOwnerCache() { + this.ownedBundlesCache.synchronous().invalidateAll(); + } + public NamespaceEphemeralData getSelfOwnerInfo() { return selfOwnerInfo; } diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java index 66341fd6aab56..56332e7316fd4 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java @@ -34,6 +34,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher; +import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,17 +68,33 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke int zookeeperSessionTimeoutMs) throws Exception { localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers, zookeeperSessionTimeoutMs); - localZkConnectionSvc.start(exitCode -> { - log.error("Shutting down ZK sessions: {}", exitCode); + localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + log.error("Shutting down ZK sessions: {}", -1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + + } }); this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor); - localZkConnectionSvc.start(exitCode -> { - try { - localZkCache.getZooKeeper().close(); - } catch (InterruptedException e) { - log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e); + localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + try { + localZkCache.getZooKeeper().close(); + } catch (InterruptedException e) { + log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e); + } + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + } }); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java index 6334c29d53fa7..e4be27e53a9d6 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java @@ -59,7 +59,7 @@ public LocalZooKeeperConnectionService(ZooKeeperClientFactory zkClientFactory, S this.zkSessionTimeoutMillis = zkSessionTimeoutMillis; } - public void start(ShutdownService shutdownService) throws IOException { + public void start(ZookeeperSessionExpiredHandler sessionExpiredHandler) throws IOException { // Connect to local ZK CompletableFuture zkFuture = zkClientFactory.create(zkConnect, SessionType.ReadWrite, (int) zkSessionTimeoutMillis); @@ -67,7 +67,7 @@ public void start(ShutdownService shutdownService) throws IOException { try { localZooKeeper = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS); localZooKeeperSessionWatcher = new ZooKeeperSessionWatcher(localZooKeeper, zkSessionTimeoutMillis, - shutdownService); + sessionExpiredHandler); localZooKeeperSessionWatcher.start(); localZooKeeper.register(localZooKeeperSessionWatcher); } catch (Exception e) { diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index 2688c10c40fd3..ff132f27b5a0a 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -34,12 +34,14 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.pulsar.common.util.FutureUtil; @@ -49,6 +51,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; @@ -503,4 +506,49 @@ public void stop() { this.backgroundExecutor.shutdown(); } + + public boolean checkRegNodeAndWaitExpired(String regPath) throws IOException { + final CountDownLatch prevNodeLatch = new CountDownLatch(1); + Watcher zkPrevRegNodewatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + // Check for prev znode deletion. Connection expiration is + // not handling, since bookie has logic to shutdown. + if (EventType.NodeDeleted == event.getType()) { + prevNodeLatch.countDown(); + } + } + }; + try { + Stat stat = getZooKeeper().exists(regPath, zkPrevRegNodewatcher); + if (null != stat) { + // if the ephemeral owner isn't current zookeeper client + // wait for it to be expired. + if (stat.getEphemeralOwner() != getZooKeeper().getSessionId()) { + log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:" + + " {} ms for znode deletion", regPath, getZooKeeper().getSessionTimeout()); + // waiting for the previous bookie reg znode deletion + if (!prevNodeLatch.await(getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) { + throw new NodeExistsException(regPath); + } else { + return false; + } + } + return true; + } else { + return false; + } + } catch (KeeperException ke) { + log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke); + throw new IOException("ZK exception checking and wait ephemeral znode " + + regPath + " expired", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie); + throw new IOException("Interrupted checking and wait ephemeral znode " + + regPath + " expired", ie); + } + } + + private static Logger log = LoggerFactory.getLogger(ZooKeeperCache.class); } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java index 5bd93264eea13..fa70c9ea47529 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java @@ -54,7 +54,7 @@ default void run() { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSessionWatcher.class); - private final ShutdownService shutdownService; + private final ZookeeperSessionExpiredHandler sessionExpiredHandler; private final ZooKeeper zk; // Maximum time to wait for ZK session to be re-connected to quorum (set to 5/6 of SessionTimeout) private final long monitorTimeoutMillis; @@ -68,11 +68,12 @@ default void run() { private volatile boolean zkOperationCompleted = false; private ScheduledFuture task; - public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ShutdownService shutdownService) { + public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ZookeeperSessionExpiredHandler sessionExpiredHandler) { this.zk = zk; this.monitorTimeoutMillis = zkSessionTimeoutMillis * 5 / 6; this.tickTimeMillis = zkSessionTimeoutMillis / 15; - this.shutdownService = shutdownService; + this.sessionExpiredHandler = sessionExpiredHandler; + this.sessionExpiredHandler.setWatcher(this); } public void start() { @@ -100,9 +101,7 @@ public void process(WatchedEvent event) { case None: if (eventState == Watcher.Event.KeeperState.Expired) { LOG.error("ZooKeeper session already expired, invoking shutdown"); - close(); - shuttingDown = true; - shutdownService.shutdown(-1); + sessionExpiredHandler.onSessionExpired(); } break; default: @@ -151,10 +150,8 @@ public synchronized void run() { keeperState = Watcher.Event.KeeperState.Disconnected; } if (keeperState == Watcher.Event.KeeperState.Expired) { - LOG.error("zoo keeper session expired, invoking shutdown service"); - close(); - shuttingDown = true; - shutdownService.shutdown(-1); + LOG.error("zookeeper session expired, invoking shutdown service"); + sessionExpiredHandler.onSessionExpired(); } else if (keeperState == Watcher.Event.KeeperState.Disconnected) { if (disconnectedAt == 0) { // this is the first disconnect, we should monitor the time out from now, so we record the time of @@ -166,9 +163,7 @@ public synchronized void run() { - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt); if (timeRemainingMillis <= 0) { LOG.error("timeout expired for reconnecting, invoking shutdown service"); - close(); - shuttingDown = true; - shutdownService.shutdown(-1); + sessionExpiredHandler.onSessionExpired(); } else { LOG.warn("zoo keeper disconnected, waiting to reconnect, time remaining = {} seconds", TimeUnit.MILLISECONDS.toSeconds(timeRemainingMillis)); @@ -189,5 +184,6 @@ public void close() { if (scheduler != null) { scheduler.shutdownNow(); } + shuttingDown = true; } } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java new file mode 100644 index 0000000000000..fc2f180660946 --- /dev/null +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.zookeeper; + +/** + * Handler interface on Zookeeper session expired + */ +public interface ZookeeperSessionExpiredHandler { + + /** + * Signal when zookeeper session is expired. + */ + void onSessionExpired(); + + void setWatcher(ZooKeeperSessionWatcher watcher); +} diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java index 4c162f666dce7..1dbb2bee2e312 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java @@ -35,7 +35,16 @@ public void testSimpleZooKeeperConnection() throws Exception { MockedZooKeeperClientFactoryImpl mockZkClientFactory = new MockedZooKeeperClientFactoryImpl(); LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService( mockZkClientFactory, "dummy", 1000); - localZkConnectionService.start(null); + localZkConnectionService.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + + } + }); // Get ZooKeeper client MockZooKeeper zk = (MockZooKeeper) localZkConnectionService.getLocalZooKeeper(); @@ -91,7 +100,16 @@ public void testSimpleZooKeeperConnectionFail() throws Exception { LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService( new ZookeeperClientFactoryImpl(), "dummy", 1000); try { - localZkConnectionService.start(null); + localZkConnectionService.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + + } + }); fail("should fail"); } catch (Exception e) { assertTrue(e.getMessage().contains("Failed to establish session with local ZK")); diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java index 9c44103e9bb11..5b49b2b781261 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertTrue; import org.apache.zookeeper.KeeperException.Code; -import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; @@ -55,7 +54,20 @@ public int getExitCode() { void setup() { zkClient = MockZooKeeper.newInstance(); shutdownService = new MockShutdownService(); - sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, shutdownService); + sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, new ZookeeperSessionExpiredHandler() { + + private ZooKeeperSessionWatcher watcher; + @Override + public void onSessionExpired() { + watcher.close(); + shutdownService.shutdown(-1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + }); } @AfterMethod @@ -113,8 +125,21 @@ public void testProcessResultNoNode() { } @Test - public void testRun1() throws Exception { - ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000, shutdownService); + void testRun1() throws Exception { + ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000, + new ZookeeperSessionExpiredHandler() { + private ZooKeeperSessionWatcher watcher; + @Override + public void onSessionExpired() { + watcher.close(); + shutdownService.shutdown(-1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + }); sessionWatcherZkNull.run(); assertFalse(sessionWatcherZkNull.isShutdownStarted()); assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected); @@ -123,8 +148,22 @@ public void testRun1() throws Exception { } @Test - public void testRun2() throws Exception { - ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0, shutdownService); + void testRun2() throws Exception { + ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0, + new ZookeeperSessionExpiredHandler() { + + private ZooKeeperSessionWatcher watcher; + @Override + public void onSessionExpired() { + watcher.close(); + shutdownService.shutdown(-1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + }); sessionWatcherZkNull.run(); assertTrue(sessionWatcherZkNull.isShutdownStarted()); assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected);