Skip to content

Commit

Permalink
Modify the public facing api for putting events in the front of the C…
Browse files Browse the repository at this point in the history
…oordinator Queue
  • Loading branch information
Shrinand Thakkar committed Aug 3, 2023
1 parent 625fb0b commit e122778
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4002,7 +4002,10 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
shadowCoordinatorQueue.poll();
}

// As we expect the reattempt event to added to the front of the queue, the front of the queue should be the same.
// Take out the initial leaderDoAssignmentForNewlyElectedLeader
shadowCoordinatorQueue.poll();

// As we expect the reattempt event to be added to the front, the front of the queue should now be the same.
Assert.assertEquals(shadowCoordinatorQueue.poll(), leaderDoAssignmentForNewlyElectedLeader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) {
_leaderDoAssignmentScheduled.set(true);
// scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt.
_leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> {
_eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader), false);
_eventQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader));
_leaderDoAssignmentScheduled.set(false);
}, 0, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,20 @@ public synchronized void put(CoordinatorEvent event) {
put(event, true);
}

/**
* Add a single event to the queue. Adds the event to the front of the queue.
* @param event CoordinatorEvent event to add to the queue
*/
public synchronized void putFirst(CoordinatorEvent event) {
put(event, false);
}

/**
* Add a single event to the queue, de-duping events with the same name and same metadata.
* @param event CoordinatorEvent event to add to the queue
* @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise.
*/
public synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) {
private synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) {
LOG.info("Queuing event {} at the " + (insertInTheEnd ? "end" : "front") + " of the event queue", event.getType());
if (_eventSet.contains(event)) {
_counter.inc(); // count duplicate event
Expand Down

0 comments on commit e122778

Please sign in to comment.