From 457ac6006c8a613ba78adaded680b76f56451802 Mon Sep 17 00:00:00 2001 From: Vaibhav Maheshwari Date: Mon, 2 Aug 2021 22:50:58 -0700 Subject: [PATCH] Handle the new session after session expiry (#770) This is the final change to handle new session after session expiry. In this change, we have re-initialized all the local states, listeners, event threads and made the node re-join the cluster. --- .../datastream/server/TestCoordinator.java | 39 +++++- .../datastream/server/Coordinator.java | 22 ++- .../datastream/server/CoordinatorConfig.java | 7 + .../datastream/server/zk/ZkAdapter.java | 128 ++++++++++++++---- .../datastream/server/zk/TestZkAdapter.java | 101 ++++++++++++-- 5 files changed, 254 insertions(+), 43 deletions(-) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 69ad874d5..a85b6d999 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -2882,11 +2882,29 @@ public void testCoordinatorLeaderCleanupTasksPostElection() throws Exception { @Test public void testOnSessionExpired() throws Exception { + testOnSessionExpired(false); + } + + @Test + public void testOnSessionExpiredHandleNewSession() throws Exception { + testOnSessionExpired(true); + } + + void testOnSessionExpired(boolean handleNewSession) throws DatastreamException, InterruptedException { String testCluster = "testCoordinationSmoke3"; String testConnectorType = "testConnectorType"; String datastreamName = "datastreamNameSessionExpired"; - Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster); + Properties props = new Properties(); + props.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster); + props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString); + props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT)); + props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT)); + props.put(CoordinatorConfig.CONFIG_REINIT_ON_NEW_ZK_SESSION, String.valueOf(handleNewSession)); + + ZkClient zkClient = new ZkClient(_zkConnectionString); + _cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster); + Coordinator instance1 = new TestCoordinatorWithSpyZkAdapter(_cachedDatastreamReader, props); instance1.addTransportProvider(DummyTransportProviderAdminFactory.PROVIDER_NAME, new DummyTransportProviderAdminFactory().createTransportProviderAdmin( DummyTransportProviderAdminFactory.PROVIDER_NAME, new Properties())); @@ -2897,20 +2915,29 @@ public void testOnSessionExpired() throws Exception { new SourceBasedDeduper(), null); instance1.start(); - ZkClient zkClient = new ZkClient(_zkConnectionString); - DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName); + ZkClient zkClient1 = new ZkClient(_zkConnectionString); + DatastreamTestUtils.createAndStoreDatastreams(zkClient1, testCluster, testConnectorType, datastreamName); // verify the assignment assertConnectorAssignment(connector1, WAIT_TIMEOUT_MS, datastreamName); + zkClient.delete(KeyBuilder.liveInstance(testCluster, "0000000000")); instance1.onSessionExpired(); - PollUtils.poll(() -> { - return connector1._tasks.size() == 0; - }, 1000, WAIT_TIMEOUT_MS); + PollUtils.poll(() -> connector1._tasks.size() == 0, 1000, WAIT_TIMEOUT_MS); Assert.assertEquals(instance1.getDatastreamTasks().size(), 0); Thread t = instance1.getEventThread(); Assert.assertFalse(t != null && t.isAlive()); Assert.assertTrue(PollUtils.poll(instance1::isZkSessionExpired, 100, 30000)); verify(mockStrategy, times(1)).cleanupStrategy(); + + if (handleNewSession) { + instance1.onNewSession(); + PollUtils.poll(() -> connector1._tasks.size() == 1, 1000, WAIT_TIMEOUT_MS); + Assert.assertEquals(instance1.getDatastreamTasks().size(), 1); + t = instance1.getEventThread(); + Assert.assertTrue(t != null && t.isAlive()); + Assert.assertTrue(PollUtils.poll(() -> instance1.getIsLeader().getAsBoolean(), 100, 30000)); + } + instance1.stop(); instance1.getDatastreamCache().getZkclient().close(); } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 3e65fe67a..43c525784 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -119,9 +119,10 @@ * Coordinator Connector * * ┌──────────────┐ ┌─────────────────────────────────────────┐ ┌─────────────────┐ - * │ │ │ │ │ │ - * │ │ │ │ │ │ * │ │ │ ┌──────────┐ ┌────────────────┐ │ │ │ + * │ │ │ | |──▶ onNewSession │ │ | │ + * │ │ │ │ │ └────────────────┘ │ │ │ + * │ │ │ | | ┌────────────────┐ │ │ │ * │ │ │ │ZkAdapter ├──▶ onBecomeLeader │ │ │ │ * │ │ │ │ │ └────────────────┘ │ │ │ * │ ├───────┼─▶ │ ┌──────────────────┐ │ │ │ @@ -248,7 +249,7 @@ ZkAdapter createZkAdapter() { public void start() { _log.info("Starting coordinator"); startEventThread(); - _adapter.connect(); + _adapter.connect(_config.getReinitOnNewZkSession()); for (String connectorType : _connectors.keySet()) { ConnectorInfo connectorInfo = _connectors.get(connectorType); @@ -530,6 +531,21 @@ boolean isZkSessionExpired() { return _zkSessionExpired; } + @Override + public void onNewSession() { + createEventThread(); + startEventThread(); + _adapter.connect(true); + // ensure it doesn't miss any assignment created + _eventQueue.put(CoordinatorEvent.createHandleAssignmentChangeEvent()); + + // Queue up one heartbeat per period with a initial delay of 3 periods + _executor.scheduleAtFixedRate(() -> _eventQueue.put(CoordinatorEvent.HEARTBEAT_EVENT), + _heartbeatPeriod.toMillis() * 3, _heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS); + + _zkSessionExpired = false; + } + private void getAssignmentsFuture(List> assignmentChangeFutures, Instant start) throws TimeoutException, InterruptedException { for (Future assignmentChangeFuture : assignmentChangeFutures) { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java index 070662150..e07a1ae45 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java @@ -29,6 +29,7 @@ public final class CoordinatorConfig { public static final String CONFIG_ZK_CLEANUP_ORPHAN_CONNECTOR_TASK_LOCK = PREFIX + "zkCleanUpOrphanConnectorTaskLock"; public static final String CONFIG_MAX_DATASTREAM_TASKS_PER_INSTANCE = PREFIX + "maxDatastreamTasksPerInstance"; public static final String CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP = PREFIX + "performPreAssignmentCleanup"; + public static final String CONFIG_REINIT_ON_NEW_ZK_SESSION = PREFIX + "reinitOnNewZKSession"; private final String _cluster; private final String _zkAddress; @@ -44,6 +45,7 @@ public final class CoordinatorConfig { private final boolean _zkCleanUpOrphanConnectorTaskLock; private final int _maxDatastreamTasksPerInstance; private final boolean _performPreAssignmentCleanup; + private final boolean _reinitOnNewZkSession; /** * Construct an instance of CoordinatorConfig @@ -65,6 +67,7 @@ public CoordinatorConfig(Properties config) { _zkCleanUpOrphanConnectorTaskLock = _properties.getBoolean(CONFIG_ZK_CLEANUP_ORPHAN_CONNECTOR_TASK_LOCK, false); _maxDatastreamTasksPerInstance = _properties.getInt(CONFIG_MAX_DATASTREAM_TASKS_PER_INSTANCE, 0); _performPreAssignmentCleanup = _properties.getBoolean(CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP, false); + _reinitOnNewZkSession = _properties.getBoolean(CONFIG_REINIT_ON_NEW_ZK_SESSION, false); } public Properties getConfigProperties() { @@ -118,4 +121,8 @@ public boolean getPerformPreAssignmentCleanup() { public long getDebounceTimerMs() { return _debounceTimerMs; } + + public boolean getReinitOnNewZkSession() { + return _reinitOnNewZkSession; + } } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index 952b7774a..adc8b7bbf 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -139,7 +139,7 @@ public class ZkAdapter { private String _currentSubscription = null; private ZkLeaderElectionListener _leaderElectionListener = null; - private ZkBackedTaskListProvider _assignmentList = null; + private ZkBackedTaskListProvider _assignmentListProvider = null; private ZkStateChangeListener _stateChangeListener = null; // only the leader should maintain this list and listen to the changes of live instances @@ -159,6 +159,8 @@ public class ZkAdapter { // object to synchronize zk session handling states private final Object _zkSessionLock = new Object(); + private boolean _reinitOnNewSession = false; + /** * Constructor * @param zkServers ZooKeeper server address to connect to @@ -229,7 +231,7 @@ public void disconnect() { } catch (ZkException zke) { // do nothing, best effort clean up } finally { - closeZkListener(true); + closeZkListeners(true); _zkclient.close(); _zkclient = null; _leaderElectionListener = null; @@ -248,10 +250,32 @@ ZkClient createZkClient() { * the actions that need to be taken with them, which are implemented in the Coordinator class */ public void connect() { - disconnect(); // Guard against leaking an existing zookeeper session - _zkclient = createZkClient(); - _stateChangeListener = new ZkStateChangeListener(); - _leaderElectionListener = new ZkLeaderElectionListener(); + connect(false); + } + + /** + * Connect the adapter so that it can connect and bridge events between ZooKeeper changes and + * the actions that need to be taken with them, which are implemented in the Coordinator class + * + * @param reinitOnNewSession re-initialize the object on new session after session expiry + */ + public void connect(boolean reinitOnNewSession) { + _reinitOnNewSession = reinitOnNewSession; + if (_zkclient == null) { + _zkclient = createZkClient(); + } + + if (_stateChangeListener == null) { + _stateChangeListener = new ZkStateChangeListener(); + } + + if (_leaderElectionListener == null) { + _leaderElectionListener = new ZkLeaderElectionListener(); + } + + if (_liveInstancesProvider == null) { + _liveInstancesProvider = new ZkBackedLiveInstanceListProvider(); + } // create a globally unique instance name and create a live instance node in ZooKeeper _instanceName = createLiveInstanceNode(); @@ -260,7 +284,9 @@ public void connect() { // both leader and follower needs to listen to its own instance change // under /{cluster}/instances/{instance} - _assignmentList = new ZkBackedTaskListProvider(_cluster, _instanceName); + if (_assignmentListProvider == null) { + _assignmentListProvider = new ZkBackedTaskListProvider(_cluster, _instanceName); + } // start with follower state, then join leader election onBecomeFollower(); @@ -295,11 +321,15 @@ private void onBecomeFollower() { LOG.info("Instance " + _instanceName + " becomes follower"); - closeZkListener(false); + closeZkListeners(false); _isLeader = false; } - private void closeZkListener(boolean isDisconnect) { + private void closeZkListeners(boolean isDisconnect) { + closeZkListeners(isDisconnect, false); + } + + private void closeZkListeners(boolean isDisconnect, boolean isSessionExpired) { // Clean the following listeners only during zookeeper disconnect if (isDisconnect) { @@ -308,9 +338,14 @@ private void closeZkListener(boolean isDisconnect) { _stateChangeListener = null; } - if (_assignmentList != null) { - _assignmentList.close(); - _assignmentList = null; + // unsubscribe any other left subscription. + _zkclient.unsubscribeAll(); + } + + if (isDisconnect || isSessionExpired) { + if (_assignmentListProvider != null) { + _assignmentListProvider.close(); + _assignmentListProvider = null; } if (_currentSubscription != null) { @@ -318,8 +353,10 @@ private void closeZkListener(boolean isDisconnect) { _currentSubscription = null; } - // unsubscribe any other left subscription. - _zkclient.unsubscribeAll(); + if (_liveInstancesProvider != null) { + _liveInstancesProvider.close(); + _liveInstancesProvider = null; + } } if (_datastreamList != null) { @@ -327,11 +364,6 @@ private void closeZkListener(boolean isDisconnect) { _datastreamList = null; } - if (_liveInstancesProvider != null) { - _liveInstancesProvider.close(); - _liveInstancesProvider = null; - } - if (_targetAssignmentProvider != null) { _targetAssignmentProvider.close(); _targetAssignmentProvider = null; @@ -377,8 +409,7 @@ private void joinLeaderElection() { if (index < 0) { // only when the ZooKeeper session already expired by the time this adapter joins for leader election. // mostly because the zkclient session expiration timeout. - LOG.error("Failed to join leader election. Try reconnect the zookeeper"); - connect(); + LOG.error("Failed to join leader election. wait for the new session to be established"); return; } @@ -1415,7 +1446,6 @@ public void releaseTask(DatastreamTaskImpl task) { task.getDatastreamTaskName()); return; } - _zkclient.delete(lockPath); LOG.info("{} successfully released the lock on {}-{}/{}", _instanceName, task.getConnectorType(), task.getTaskPrefix(), task.getDatastreamTaskName()); @@ -1498,6 +1528,11 @@ public interface ZkAdapterListener { * onSessionExpired is called when the zookeeper session expires. */ void onSessionExpired(); + + /** + * onNewSession is called when the zookeeper session is established after expiry. + */ + void onNewSession(); } /** @@ -1797,6 +1832,9 @@ public void handleStateChanged(Watcher.Event.KeeperState state) { public void handleNewSession() { synchronized (_zkSessionLock) { LOG.info("ZkStateChangeListener::A new session has been established."); + if (_reinitOnNewSession) { + onNewSession(); + } } } @@ -1816,20 +1854,28 @@ private void scheduleExpiryTimerAfterSessionTimeout() { } } + @VisibleForTesting + void onNewSession() { + if (_listener != null) { + _listener.onNewSession(); + } + } + @VisibleForTesting void onSessionExpired() { synchronized (_zkSessionLock) { LOG.error("Zookeeper session expired."); // cancel the lock clean up _orphanLockCleanupFuture.cancel(true); + _orphanLockCleanupFuture = CompletableFuture.completedFuture("completed"); + closeZkListeners(false, true); onBecomeFollower(); if (_listener != null) { _listener.onSessionExpired(); } - // currently it will try to disconnect and fail. TODO: fix the connect and listen to handleNewSession. - // Temporary hack to kill the zkEventThread at this point, to ensure that the connection to zookeeper - // is not re-initialized till reconnect path is fixed. - disconnect(); + if (!_reinitOnNewSession) { + disconnect(); + } } } @@ -1837,4 +1883,34 @@ void onSessionExpired() { long getSessionId() { return _zkclient.getSessionId(); } + + @VisibleForTesting + ZkLeaderElectionListener getLeaderElectionListener() { + return _leaderElectionListener; + } + + @VisibleForTesting + ZkBackedTaskListProvider getAssignmentListProvider() { + return _assignmentListProvider; + } + + @VisibleForTesting + ZkStateChangeListener getStateChangeListener() { + return _stateChangeListener; + } + + @VisibleForTesting + ZkBackedLiveInstanceListProvider getLiveInstancesProvider() { + return _liveInstancesProvider; + } + + @VisibleForTesting + ZkBackedDMSDatastreamList getDatastreamList() { + return _datastreamList; + } + + @VisibleForTesting + ZkTargetAssignmentProvider getTargetAssignmentProvider() { + return _targetAssignmentProvider; + } } diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java index 24a643df6..69365b438 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java @@ -17,7 +17,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -42,7 +41,10 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** @@ -53,7 +55,6 @@ public class TestZkAdapter { private static final int ZK_WAIT_IN_MS = 500; private static final long ZK_DEBOUNCE_TIMER_MS = 1000; - private final String defaultTransportProviderName = "test"; private EmbeddedZookeeper _embeddedZookeeper; private String _zkConnectionString; @@ -715,11 +716,13 @@ private ZkClientInterceptingAdapter createInterceptingZkAdapter(String testClust private static class ZkClientInterceptingAdapter extends ZkAdapter { private ZkClient _zkClient; + private long _sleepMs; public ZkClientInterceptingAdapter(String zkConnectionString, String testCluster, String defaultTransportProviderName, int defaultSessionTimeoutMs, int defaultConnectionTimeoutMs, long debounceTimerMs, ZkAdapterListener listener) { super(zkConnectionString, testCluster, defaultTransportProviderName, defaultSessionTimeoutMs, defaultConnectionTimeoutMs, debounceTimerMs, listener); + _sleepMs = defaultSessionTimeoutMs; } @Override @@ -734,13 +737,24 @@ public ZkClient getZkClient() { } @Test - public void testZookeeperSessionExpiry() throws InterruptedException { + public void testZookeeperSessionExpiryDontReinitNewSession() throws InterruptedException { + testZookeeperSessionExpiry(false); + } + @Test + public void testZookeeperSessionExpiryReinitNewSession() throws InterruptedException { + testZookeeperSessionExpiry(true); + } + + private void testZookeeperSessionExpiry(boolean reinitNewSession) throws InterruptedException { String testCluster = "testDeleteTaskWithPrefix"; String connectorType = "connectorType"; Duration timeout = Duration.ofMinutes(1); ZkClientInterceptingAdapter adapter = createInterceptingZkAdapter(testCluster, 5000, ZK_DEBOUNCE_TIMER_MS); - adapter.connect(); + adapter.connect(reinitNewSession); + + ZkClientInterceptingAdapter adapter2 = createInterceptingZkAdapter(testCluster, 5000, ZK_DEBOUNCE_TIMER_MS); + adapter2.connect(reinitNewSession); DatastreamTaskImpl task = new DatastreamTaskImpl(); task.setId("3"); @@ -753,10 +767,81 @@ public void testZookeeperSessionExpiry() throws InterruptedException { LOG.info("Acquire from instance1 should succeed"); Assert.assertTrue(expectException(() -> task.acquire(timeout), false)); + Assert.assertTrue(adapter.isLeader()); + Assert.assertFalse(adapter2.isLeader()); + verifyZkListenersOfLeader(adapter); + verifyZkListenersOfFollower(adapter2); + simulateSessionExpiration(adapter); Thread.sleep(5000); - Mockito.verify(adapter, Mockito.times(1)).onSessionExpired(); + verify(adapter, times(1)).onSessionExpired(); + if (!reinitNewSession) { + verifyZkListenersAfterDisconnect(adapter); + } else { + verifyZkListenersAfterExpiredSession(adapter); + } + Assert.assertFalse(adapter.isLeader()); + Assert.assertTrue(PollUtils.poll(adapter2::isLeader, 100, ZK_WAIT_IN_MS)); + verifyZkListenersOfLeader(adapter2); + + Assert.assertTrue(PollUtils.poll(adapter2::isLeader, 100, ZK_WAIT_IN_MS)); + Assert.assertTrue(adapter2.isLeader()); + + //This connect is called from the coordinator code, calling it explicitly here for testing. + if (reinitNewSession) { + verify(adapter, times(1)).onNewSession(); + Assert.assertFalse(adapter.isLeader()); + adapter.connect(); + verifyZkListenersOfFollower(adapter); + } + + adapter2.disconnect(); + verifyZkListenersAfterDisconnect(adapter2); + + if (reinitNewSession) { + Assert.assertTrue(PollUtils.poll(adapter::isLeader, 100, ZK_WAIT_IN_MS)); + verifyZkListenersOfLeader(adapter); + + adapter.disconnect(); + verifyZkListenersAfterDisconnect(adapter); + } + } + + private void verifyZkListenersAfterDisconnect(ZkClientInterceptingAdapter adapter) { + Assert.assertNull(adapter.getLeaderElectionListener()); + Assert.assertNull(adapter.getAssignmentListProvider()); + Assert.assertNull(adapter.getStateChangeListener()); + Assert.assertNull(adapter.getLiveInstancesProvider()); + Assert.assertNull(adapter.getDatastreamList()); + Assert.assertNull(adapter.getTargetAssignmentProvider()); + } + + private void verifyZkListenersAfterExpiredSession(ZkClientInterceptingAdapter adapter) { + Assert.assertNotNull(adapter.getLeaderElectionListener()); + Assert.assertNull(adapter.getAssignmentListProvider()); + Assert.assertNotNull(adapter.getStateChangeListener()); + Assert.assertNull(adapter.getLiveInstancesProvider()); + Assert.assertNull(adapter.getDatastreamList()); + Assert.assertNull(adapter.getTargetAssignmentProvider()); + } + + private void verifyZkListenersOfFollower(ZkClientInterceptingAdapter adapter2) { + Assert.assertNotNull(adapter2.getLeaderElectionListener()); + Assert.assertNotNull(adapter2.getAssignmentListProvider()); + Assert.assertNotNull(adapter2.getStateChangeListener()); + Assert.assertNotNull(adapter2.getLiveInstancesProvider()); + Assert.assertNull(adapter2.getDatastreamList()); + Assert.assertNull(adapter2.getTargetAssignmentProvider()); + } + + private void verifyZkListenersOfLeader(ZkClientInterceptingAdapter adapter) { + Assert.assertNotNull(adapter.getLeaderElectionListener()); + Assert.assertNotNull(adapter.getAssignmentListProvider()); + Assert.assertNotNull(adapter.getStateChangeListener()); + Assert.assertNotNull(adapter.getLiveInstancesProvider()); + Assert.assertNotNull(adapter.getDatastreamList()); + Assert.assertNotNull(adapter.getTargetAssignmentProvider()); } @Test @@ -850,7 +935,7 @@ public void testDeleteTasksWithPrefix() throws InterruptedException { updateInstanceAssignment(adapter, adapter.getInstanceName(), tasks); adapter.acquireTask(lockTask, Duration.ofSeconds(2)); - ZkClient zkClient = Mockito.spy(adapter.getZkClient()); + ZkClient zkClient = spy(adapter.getZkClient()); // Delete a few nodes for (int j = 0; j < 8; j++) { @@ -861,8 +946,8 @@ public void testDeleteTasksWithPrefix() throws InterruptedException { // Not the most ideal way to test the issue of not being able to delete when the top level zk node is full, // but creating EmbeddedZK with smaller jute.maxbuffer size to actually testing filling a directory to // max requires setting system property which will interfere with any other parallel test using EmbeddedZk. - Mockito.verify(zkClient, Mockito.never()).getChildren(any()); - Mockito.verify(zkClient, Mockito.never()).getChildren(any(), anyBoolean()); + verify(zkClient, never()).getChildren(any()); + verify(zkClient, never()).getChildren(any(), anyBoolean()); List leftOverTasks = zkClient.getChildren(KeyBuilder.connector(testCluster, connectorType)); Assert.assertEquals(leftOverTasks.size(), 3);