From 625fb0b4f9b4af686261c6b7dcb3f6ee27c7e447 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Mon, 24 Jul 2023 09:50:29 -0700 Subject: [PATCH 1/4] Replacing Coordinator Queue With Deque & Fixing Usage Of toMap Util --- .../server/CallableCoordinatorForTest.java | 20 ++++++ .../datastream/server/TestCoordinator.java | 67 ++++++++++++++++++- .../datastream/server/Coordinator.java | 10 +-- .../datastream/server/CoordinatorEvent.java | 2 +- .../server/CoordinatorEventBlockingQueue.java | 28 ++++++-- .../StickyPartitionAssignmentStrategy.java | 3 +- .../datastream/server/zk/ZkAdapter.java | 9 +-- .../TestCoordinatorEventBlockingQueue.java | 12 ++-- ...TestStickyPartitionAssignmentStrategy.java | 33 +++++++++ 9 files changed, 158 insertions(+), 26 deletions(-) create mode 100644 datastream-server-restli/src/test/java/com/linkedin/datastream/server/CallableCoordinatorForTest.java diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/CallableCoordinatorForTest.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/CallableCoordinatorForTest.java new file mode 100644 index 000000000..9d0052288 --- /dev/null +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/CallableCoordinatorForTest.java @@ -0,0 +1,20 @@ +/** + * Copyright 2023 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server; + +import java.util.Properties; + +/** + * Callable Coordinator is used for overriding coordinator behaviors for tests + */ +public interface CallableCoordinatorForTest { + /** + * invoking constructor of coordinator with params, + * - datastreamCache to maintain all the datastreams in the cluster. + * - properties to use while creating coordinator. + * */ + Coordinator invoke(CachedDatastreamReader cachedDatastreamReader, Properties properties); +} \ No newline at end of file diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 39b13e3a1..c7368e8ff 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -9,6 +9,7 @@ import java.lang.reflect.Method; import java.time.Duration; import java.time.Instant; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -21,6 +22,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -136,7 +138,12 @@ private Coordinator createCoordinator(String zkAddr, String cluster) throws Exce } private Coordinator createCoordinator(String zkAddr, String cluster, Properties override) throws Exception { - return createCoordinator(zkAddr, cluster, override, new DummyTransportProviderAdminFactory()); + return createCoordinator(zkAddr, cluster, override, new DummyTransportProviderAdminFactory(), Coordinator::new); + } + + private Coordinator createCoordinator(String zkAddr, String cluster, Properties override, + TransportProviderAdminFactory transportProviderAdminFactory) throws Exception { + return createCoordinator(zkAddr, cluster, override, transportProviderAdminFactory, Coordinator::new); } private Coordinator createCoordinator(String zkAddr, String cluster, Properties override, @@ -163,7 +170,7 @@ protected synchronized void handleEvent(CoordinatorEvent event) { } private Coordinator createCoordinator(String zkAddr, String cluster, Properties override, - TransportProviderAdminFactory transportProviderAdminFactory) throws Exception { + TransportProviderAdminFactory transportProviderAdminFactory, CallableCoordinatorForTest callableCoordinatorForTest) throws Exception { Properties props = new Properties(); props.put(CoordinatorConfig.CONFIG_CLUSTER, cluster); props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr); @@ -172,7 +179,7 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties props.putAll(override); ZkClient client = new ZkClient(zkAddr); _cachedDatastreamReader = new CachedDatastreamReader(client, cluster); - Coordinator coordinator = new Coordinator(_cachedDatastreamReader, props); + Coordinator coordinator = callableCoordinatorForTest.invoke(_cachedDatastreamReader, props); coordinator.addTransportProvider(DummyTransportProviderAdminFactory.PROVIDER_NAME, transportProviderAdminFactory.createTransportProviderAdmin(DummyTransportProviderAdminFactory.PROVIDER_NAME, new Properties())); @@ -3945,6 +3952,60 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith coordinator.getDatastreamCache().getZkclient().close(); } + @Test + public void testLeaderDoAssignmentForNewlyElectedLeaderFailurePath() throws Exception { + String testCluster = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath"; + String connectorType = "connectorType"; + String streamName = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath"; + + Queue shadowCoordinatorQueue = new ArrayDeque<>(); + Properties properties = new Properties(); + Coordinator coordinator = + createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(), + (cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) { + + // This override generates an exception while the newly elected leader performs pre assignment cleanup. + // The exception causes the handleLeaderDoAssignment handler to exit, along with inserting the same event + // in the queue for a reattempt. + @Override + protected void performPreAssignmentCleanup(List datastreamGroups) { + throw new RuntimeException("testing exception path in assignment cleanup routine"); + } + + // This override collects the coordinator queue events in a shadow queue for test purposes. + @Override + protected synchronized void handleEvent(CoordinatorEvent event) { + shadowCoordinatorQueue.add(event); + super.handleEvent(event); + } + }); + TestHookConnector dummyConnector = new TestHookConnector("dummyConnector", connectorType); + coordinator.addConnector(connectorType, dummyConnector, new BroadcastStrategy(Optional.empty()), false, + new SourceBasedDeduper(), null); + coordinator.start(); + + ZkClient zkClient = new ZkClient(_zkConnectionString); + + Datastream testDatastream = + DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, streamName)[0]; + + coordinator.stop(); + zkClient.close(); + coordinator.getDatastreamCache().getZkclient().close(); + + // This is the event which should be added to the front of the queue once the handler exits on an exception. + CoordinatorEvent leaderDoAssignmentForNewlyElectedLeader = + new CoordinatorEvent(CoordinatorEvent.EventType.LEADER_DO_ASSIGNMENT, true); + + // while-ing until the newly elected leader performs the handlerLeaderDoAssignment request for the first time. + while (!Objects.equals(shadowCoordinatorQueue.peek(), leaderDoAssignmentForNewlyElectedLeader)) { + shadowCoordinatorQueue.poll(); + } + + // As we expect the reattempt event to added to the front of the queue, the front of the queue should be the same. + Assert.assertEquals(shadowCoordinatorQueue.poll(), leaderDoAssignmentForNewlyElectedLeader); + } + // This helper function helps compare the requesting topics with the topics reflected in the server. private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream testStream, Coordinator coordinator, Set requestedThroughputViolatingTopics) { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index e03a13708..5e84241b3 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -834,7 +834,8 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx _assignedDatastreamTasks.putAll(currentAssignment.values() .stream() .flatMap(Collection::stream) - .collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity()))); + .collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(), + (existingTask, duplicateTask) -> existingTask))); List newAssignment = new ArrayList<>(_assignedDatastreamTasks.values()); if ((totalTasks - submittedTasks) > 0) { @@ -1524,10 +1525,11 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) { _log.info("Schedule retry for leader assigning tasks"); _metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.HANDLE_LEADER_DO_ASSIGNMENT_NUM_RETRIES, 1); _leaderDoAssignmentScheduled.set(true); + // scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt. _leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> { - _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader)); + _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader), false); _leaderDoAssignmentScheduled.set(false); - }, _config.getRetryIntervalMs(), TimeUnit.MILLISECONDS); + }, 0, TimeUnit.MILLISECONDS); } @VisibleForTesting @@ -1614,7 +1616,7 @@ private void revokeUnclaimedAssignmentTokens(Map> } } - private void performPreAssignmentCleanup(List datastreamGroups) { + protected void performPreAssignmentCleanup(List datastreamGroups) { // Map between instance to tasks assigned to the instance. Map> previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks(); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEvent.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEvent.java index e6b72536e..0b139dcca 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEvent.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEvent.java @@ -52,7 +52,7 @@ private CoordinatorEvent(EventType eventType) { _eventMetadata = null; } - private CoordinatorEvent(EventType eventType, Object eventMetadata) { + protected CoordinatorEvent(EventType eventType, Object eventMetadata) { _eventType = eventType; _eventMetadata = eventMetadata; } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java index 0c60cfb22..a234ffe25 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java @@ -7,12 +7,12 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Deque; import java.util.HashSet; import java.util.List; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +44,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware { static final String GAUGE_KEY = "queuedEvents"; private final Set _eventSet; - private final Queue _eventQueue; + private final Deque _eventQueue; private final DynamicMetricsManager _dynamicMetricsManager; private final Gauge _gauge; private final Counter _counter; @@ -59,7 +59,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware { */ CoordinatorEventBlockingQueue(String key) { _eventSet = new HashSet<>(); - _eventQueue = new LinkedBlockingQueue<>(); + _eventQueue = new LinkedBlockingDeque<>(); _dynamicMetricsManager = DynamicMetricsManager.getInstance(); String prefix = buildMetricName(key); @@ -73,16 +73,30 @@ class CoordinatorEventBlockingQueue implements MetricsAware { /** - * Add a single event to the queue, overwriting events with the same name and same metadata. + * Add a single event to the queue. Defaults to adding the event at the end of the queue. * @param event CoordinatorEvent event to add to the queue */ public synchronized void put(CoordinatorEvent event) { - LOG.info("Queuing event {} to event queue", event.getType()); + put(event, true); + } + + /** + * Add a single event to the queue, de-duping events with the same name and same metadata. + * @param event CoordinatorEvent event to add to the queue + * @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise. + */ + public synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { + LOG.info("Queuing event {} at the " + (insertInTheEnd ? "end" : "front") + " of the event queue", event.getType()); if (_eventSet.contains(event)) { _counter.inc(); // count duplicate event } else { // only insert if there isn't an event present in the queue with the same name and same metadata. - boolean result = _eventQueue.offer(event); + boolean result; + if (insertInTheEnd) { + result = _eventQueue.offer(event); + } else { + result = _eventQueue.offerFirst(event); + } if (!result) { return; } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java index 72753a65a..43a9262d2 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java @@ -487,7 +487,8 @@ public Map> getTasksToCleanUp(List Map assignmentsMap = currentAssignment.values() .stream() .flatMap(Collection::stream) - .collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity())); + .collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(), + (existingTask, duplicateTask) -> existingTask)); for (String instance : currentAssignment.keySet()) { // find the dependency tasks which also exist in the assignmentsMap. diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index 5ec856a08..959b5c545 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -759,10 +759,11 @@ public void updateAllAssignmentsAndIssueTokens(Map> private Map> getStoppingDatastreamGroupInstances( List stoppingDatastreamGroups) { Map> currentAssignment = getAllAssignedDatastreamTasks(); - Set stoppingDatastreamTaskPrefixes = stoppingDatastreamGroups.stream(). - map(DatastreamGroup::getTaskPrefix).collect(toSet()); - Map taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream(). - collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity())); + Set stoppingDatastreamTaskPrefixes = + stoppingDatastreamGroups.stream().map(DatastreamGroup::getTaskPrefix).collect(toSet()); + Map taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream() + .collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity(), + (existingDatastreamGroup, duplicateDatastreamGroup) -> existingDatastreamGroup)); Map> stoppingDgInstances = new HashMap<>(); currentAssignment.keySet() diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java index ca07b4993..865dae295 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java @@ -42,20 +42,20 @@ public void resetMetrics() { public void testHappyPath() throws Exception { CoordinatorEventBlockingQueue eventBlockingQueue = new CoordinatorEventBlockingQueue(SIMPLE_NAME); eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false)); - eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true)); - eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false)); - eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true)); + eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true), false); + eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false), true); + eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true), false); eventBlockingQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1")); eventBlockingQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1")); - eventBlockingQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test2")); + eventBlockingQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test2"), false); eventBlockingQueue.put(CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); eventBlockingQueue.put(CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); eventBlockingQueue.put(CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); Assert.assertEquals(eventBlockingQueue.size(), 5); - Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderDoAssignmentEvent(false)); + Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderPartitionAssignmentEvent("test2")); Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderDoAssignmentEvent(true)); + Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderDoAssignmentEvent(false)); Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1")); - Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderPartitionAssignmentEvent("test2")); Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); } diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestStickyPartitionAssignmentStrategy.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestStickyPartitionAssignmentStrategy.java index 32d616580..670c2b0fa 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestStickyPartitionAssignmentStrategy.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestStickyPartitionAssignmentStrategy.java @@ -758,6 +758,39 @@ public void testExpectedNumberOfTasks() { Assert.assertEquals(-1, getNumTasksForDatastreamFromZK(ds.get(0).getName())); } + @Test + public void testTasksCleanUpWithDuplicatesAcrossInstances() { + StickyPartitionAssignmentStrategy strategy = + createStickyPartitionAssignmentStrategy(3, 90, true, getZkClient(true), _clusterName); + List datastreams = generateDatastreams("testTasksCleanUpWithDuplicatesAcrossInstances", 1, 3); + + Map> assignment = generateEmptyAssignment(datastreams, 2, 3, true); + + List newPartitions = ImmutableList.of("t-0", "t-1", "tt-0", "tt-1", "ttt-0", "ttt-1", "ttt-2"); + assignment = + strategy.assignPartitions(assignment, new DatastreamGroupPartitionsMetadata(datastreams.get(0), newPartitions)); + + // This following snippet demonstrates a previous leader performing some task movements but got interrupted, OOMed + // or hit session expiry. + // The previous leader was able to add some tasks to the newer instance's assignment, but couldn't remove them from + // previous instance's assignment. + // The next leader should be able to identify and cleanup, even though there'll be duplicate tasks across instances. + DatastreamTask previousTask = null; + for (String instance : assignment.keySet()) { + if (previousTask != null) { + assignment.get(instance).add(previousTask); + } + previousTask = assignment.get(instance).iterator().next(); + } + + try { + Map> taskToCleanup = strategy.getTasksToCleanUp(datastreams, assignment); + Assert.assertEquals(taskToCleanup.size(), 0); + } catch (Exception exception) { + Assert.fail("Received exception while finding tasks to be cleaned up", exception.getCause()); + } + } + private int getNumTasksForDatastreamFromZK(String taskPrefix) { String numTasksPath = KeyBuilder.datastreamNumTasks(_clusterName, taskPrefix); if (!_zkClient.exists(numTasksPath)) { From a7f4e0359ac316c1dc59864daeb0c597cd96b3c0 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Thu, 3 Aug 2023 09:31:05 -0700 Subject: [PATCH 2/4] Modify the public facing api for putting events in the front of the Coordinator Queue --- .../linkedin/datastream/server/TestCoordinator.java | 5 ++++- .../com/linkedin/datastream/server/Coordinator.java | 2 +- .../server/CoordinatorEventBlockingQueue.java | 10 +++++++++- .../server/TestCoordinatorEventBlockingQueue.java | 8 ++++---- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index c7368e8ff..ede9982f7 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -4002,7 +4002,10 @@ protected synchronized void handleEvent(CoordinatorEvent event) { shadowCoordinatorQueue.poll(); } - // As we expect the reattempt event to added to the front of the queue, the front of the queue should be the same. + // Take out the initial leaderDoAssignmentForNewlyElectedLeader + shadowCoordinatorQueue.poll(); + + // As we expect the reattempt event to be added to the front, the front of the queue should now be the same. Assert.assertEquals(shadowCoordinatorQueue.poll(), leaderDoAssignmentForNewlyElectedLeader); } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 5e84241b3..6f00e2725 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -1527,7 +1527,7 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) { _leaderDoAssignmentScheduled.set(true); // scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt. _leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> { - _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader), false); + _eventQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader)); _leaderDoAssignmentScheduled.set(false); }, 0, TimeUnit.MILLISECONDS); } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java index a234ffe25..6720e8247 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java @@ -80,12 +80,20 @@ public synchronized void put(CoordinatorEvent event) { put(event, true); } + /** + * Add a single event to the queue. Adds the event to the front of the queue. + * @param event CoordinatorEvent event to add to the queue + */ + public synchronized void putFirst(CoordinatorEvent event) { + put(event, false); + } + /** * Add a single event to the queue, de-duping events with the same name and same metadata. * @param event CoordinatorEvent event to add to the queue * @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise. */ - public synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { + private synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { LOG.info("Queuing event {} at the " + (insertInTheEnd ? "end" : "front") + " of the event queue", event.getType()); if (_eventSet.contains(event)) { _counter.inc(); // count duplicate event diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java index 865dae295..9fc347d39 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java @@ -42,12 +42,12 @@ public void resetMetrics() { public void testHappyPath() throws Exception { CoordinatorEventBlockingQueue eventBlockingQueue = new CoordinatorEventBlockingQueue(SIMPLE_NAME); eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false)); - eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true), false); - eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false), true); - eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true), false); + eventBlockingQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(true)); + eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false)); + eventBlockingQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(true)); eventBlockingQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1")); eventBlockingQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1")); - eventBlockingQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test2"), false); + eventBlockingQueue.putFirst(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test2")); eventBlockingQueue.put(CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); eventBlockingQueue.put(CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); eventBlockingQueue.put(CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); From e7f3e2db6fc35ff5efc01b3ceaca9931a839e1e4 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Tue, 8 Aug 2023 12:13:18 -0700 Subject: [PATCH 3/4] Better Handling Duplicate putFirst Requests & Added couple more tests --- .../datastream/server/TestCoordinator.java | 65 +++++++++++++++++++ .../datastream/server/Coordinator.java | 5 ++ .../server/CoordinatorEventBlockingQueue.java | 9 +++ .../TestCoordinatorEventBlockingQueue.java | 15 +++++ 4 files changed, 94 insertions(+) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index ede9982f7..b5e242b6c 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -9,6 +9,7 @@ import java.lang.reflect.Method; import java.time.Duration; import java.time.Instant; +import java.util.AbstractMap; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -35,6 +36,7 @@ import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.Validate; import org.apache.zookeeper.CreateMode; @@ -4009,6 +4011,69 @@ protected synchronized void handleEvent(CoordinatorEvent event) { Assert.assertEquals(shadowCoordinatorQueue.poll(), leaderDoAssignmentForNewlyElectedLeader); } + @Test + public void testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation() throws Exception { + String testCluster = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation"; + String connectorType = "connectorType"; + String streamName = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation"; + + // This is the event which should be added to the front of the queue once the handler exits on an exception. + CoordinatorEvent leaderDoAssignmentForNewlyElectedLeader = + new CoordinatorEvent(CoordinatorEvent.EventType.LEADER_DO_ASSIGNMENT, true); + + List> + shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent = new ArrayList<>(); + + Properties properties = new Properties(); + Coordinator coordinator = + createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(), + (cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) { + + // This override generates an exception while the newly elected leader performs pre assignment cleanup. + // The exception causes the handleLeaderDoAssignment handler to exit, along with inserting the same event + // in the queue for a reattempt. + @Override + protected void performPreAssignmentCleanup(List datastreamGroups) { + throw new RuntimeException("testing exception path in assignment cleanup routine"); + } + + // This override collects the coordinator queue events in a shadow queue for test purposes. + @Override + protected synchronized void handleEvent(CoordinatorEvent event) { + CoordinatorEvent previousHead = peekCoordinatorEventBlockingQueue(); + super.handleEvent(event); + CoordinatorEvent nextHead = peekCoordinatorEventBlockingQueue(); + + // recording previous and new heads of the CoordinatorEventBlockingQueue + if (event.equals(leaderDoAssignmentForNewlyElectedLeader)) { + shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.add( + new AbstractMap.SimpleEntry<>(previousHead, nextHead)); + } + } + }); + TestHookConnector dummyConnector = new TestHookConnector("dummyConnector", connectorType); + coordinator.addConnector(connectorType, dummyConnector, new BroadcastStrategy(Optional.empty()), false, + new SourceBasedDeduper(), null); + coordinator.start(); + + ZkClient zkClient = new ZkClient(_zkConnectionString); + + Datastream testDatastream = + DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, streamName)[0]; + + coordinator.stop(); + zkClient.close(); + coordinator.getDatastreamCache().getZkclient().close(); + + // Comparing the previous and new head values when the NewLeaderDoAssignmentEvent fails. + IntStream.range(0, 3).forEach(index -> { + Assert.assertNotEquals(shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.get(index).getKey(), + leaderDoAssignmentForNewlyElectedLeader); + Assert.assertEquals(shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.get(index).getValue(), + leaderDoAssignmentForNewlyElectedLeader); + }); + } + // This helper function helps compare the requesting topics with the topics reflected in the server. private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream testStream, Coordinator coordinator, Set requestedThroughputViolatingTopics) { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 6f00e2725..94ba96bb9 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -2327,6 +2327,11 @@ CoordinatorConfig getConfig() { return _config; } + @VisibleForTesting + CoordinatorEvent peekCoordinatorEventBlockingQueue() { + return _eventQueue.peek(); + } + @VisibleForTesting static String getNumThroughputViolatingTopicsMetricName() { return CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM; diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java index 6720e8247..127166d3b 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java @@ -85,6 +85,15 @@ public synchronized void put(CoordinatorEvent event) { * @param event CoordinatorEvent event to add to the queue */ public synchronized void putFirst(CoordinatorEvent event) { + // If the requested event is already in the CoordinatorEventBlockingQueue, it will be removed to prioritize the + // event to be putFirst. + if (_eventSet.contains(event)) { + LOG.info("Prioritizing the event to be putFirst by removing the existing CoordinatorEvent " + event); + // Since the distinct content of the CoordinatorEventBlockingQueue is not anticipated to be extensive, the + // linear complexity removal operation deemed acceptable. + _eventQueue.remove(event); + _eventSet.remove(event); + } put(event, false); } diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java index 9fc347d39..2cf83ffd4 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorEventBlockingQueue.java @@ -59,6 +59,21 @@ public void testHappyPath() throws Exception { Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); } + @Test + public void testHappyPathWithDuplicatedPutFirstEventRequests() throws Exception { + CoordinatorEventBlockingQueue eventBlockingQueue = new CoordinatorEventBlockingQueue(SIMPLE_NAME); + eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false)); + eventBlockingQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(true)); + eventBlockingQueue.putFirst(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1")); + eventBlockingQueue.putFirst(CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); + eventBlockingQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(true)); + Assert.assertEquals(eventBlockingQueue.size(), 4); + Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderDoAssignmentEvent(true)); + Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT); + Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1")); + Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderDoAssignmentEvent(false)); + } + /** * Verify metric registration. */ From b8d71927bc81b118bfbe02878513416aabefafa1 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Tue, 8 Aug 2023 13:45:23 -0700 Subject: [PATCH 4/4] Fixing Flaky Test --- .../java/com/linkedin/datastream/server/TestCoordinator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index b5e242b6c..e8b2445d8 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -4042,6 +4042,7 @@ protected void performPreAssignmentCleanup(List datastreamGroup protected synchronized void handleEvent(CoordinatorEvent event) { CoordinatorEvent previousHead = peekCoordinatorEventBlockingQueue(); super.handleEvent(event); + PollUtils.poll(() -> peekCoordinatorEventBlockingQueue() != null, 50, 1000); CoordinatorEvent nextHead = peekCoordinatorEventBlockingQueue(); // recording previous and new heads of the CoordinatorEventBlockingQueue