diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index c7368e8ff..ede9982f7 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -4002,7 +4002,10 @@ protected synchronized void handleEvent(CoordinatorEvent event) { shadowCoordinatorQueue.poll(); } - // As we expect the reattempt event to added to the front of the queue, the front of the queue should be the same. + // Take out the initial leaderDoAssignmentForNewlyElectedLeader + shadowCoordinatorQueue.poll(); + + // As we expect the reattempt event to be added to the front, the front of the queue should now be the same. Assert.assertEquals(shadowCoordinatorQueue.poll(), leaderDoAssignmentForNewlyElectedLeader); } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 5e84241b3..6f00e2725 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -1527,7 +1527,7 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) { _leaderDoAssignmentScheduled.set(true); // scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt. _leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> { - _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader), false); + _eventQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader)); _leaderDoAssignmentScheduled.set(false); }, 0, TimeUnit.MILLISECONDS); } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java index a234ffe25..6720e8247 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorEventBlockingQueue.java @@ -80,12 +80,20 @@ public synchronized void put(CoordinatorEvent event) { put(event, true); } + /** + * Add a single event to the queue. Adds the event to the front of the queue. + * @param event CoordinatorEvent event to add to the queue + */ + public synchronized void putFirst(CoordinatorEvent event) { + put(event, false); + } + /** * Add a single event to the queue, de-duping events with the same name and same metadata. * @param event CoordinatorEvent event to add to the queue * @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise. */ - public synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { + private synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { LOG.info("Queuing event {} at the " + (insertInTheEnd ? "end" : "front") + " of the event queue", event.getType()); if (_eventSet.contains(event)) { _counter.inc(); // count duplicate event