-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replacing Coordinator Queue With Deque & Fixing Usage Of toMap Util #950
Changes from 1 commit
625fb0b
a7f4e03
e7f3e2d
b8d7192
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/** | ||
* Copyright 2023 LinkedIn Corporation. All rights reserved. | ||
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. | ||
* See the NOTICE file in the project root for additional information regarding copyright ownership. | ||
*/ | ||
package com.linkedin.datastream.server; | ||
|
||
import java.util.Properties; | ||
|
||
/** | ||
* Callable Coordinator is used for overriding coordinator behaviors for tests | ||
*/ | ||
public interface CallableCoordinatorForTest { | ||
/** | ||
* invoking constructor of coordinator with params, | ||
* - datastreamCache to maintain all the datastreams in the cluster. | ||
* - properties to use while creating coordinator. | ||
* */ | ||
Coordinator invoke(CachedDatastreamReader cachedDatastreamReader, Properties properties); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -834,7 +834,8 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx | |
_assignedDatastreamTasks.putAll(currentAssignment.values() | ||
.stream() | ||
.flatMap(Collection::stream) | ||
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity()))); | ||
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(), | ||
(existingTask, duplicateTask) -> existingTask))); | ||
List<DatastreamTask> newAssignment = new ArrayList<>(_assignedDatastreamTasks.values()); | ||
|
||
if ((totalTasks - submittedTasks) > 0) { | ||
|
@@ -1524,10 +1525,11 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) { | |
_log.info("Schedule retry for leader assigning tasks"); | ||
_metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.HANDLE_LEADER_DO_ASSIGNMENT_NUM_RETRIES, 1); | ||
_leaderDoAssignmentScheduled.set(true); | ||
// scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt. | ||
_leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> { | ||
_eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader)); | ||
_eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader), false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the main change in this PR. When a newly elected leader does assignment, it schedules the assignment event with task cleanup in front of the queue, to make sure it gets executed before anything else. Just to confirm my understanding. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct! We just need to make sure that nothing else gets handled apart from a successful handling of "LEADER_DO_ASSIGNMENT" (with newly elected leader flag enabled) for a newly elected leader. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
_leaderDoAssignmentScheduled.set(false); | ||
}, _config.getRetryIntervalMs(), TimeUnit.MILLISECONDS); | ||
}, 0, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -1614,7 +1616,7 @@ private void revokeUnclaimedAssignmentTokens(Map<String, List<AssignmentToken>> | |
} | ||
} | ||
|
||
private void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) { | ||
protected void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) { | ||
|
||
// Map between instance to tasks assigned to the instance. | ||
Map<String, Set<DatastreamTask>> previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,12 +7,12 @@ | |
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Deque; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Queue; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.LinkedBlockingDeque; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -44,7 +44,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware { | |
static final String GAUGE_KEY = "queuedEvents"; | ||
|
||
private final Set<CoordinatorEvent> _eventSet; | ||
private final Queue<CoordinatorEvent> _eventQueue; | ||
private final Deque<CoordinatorEvent> _eventQueue; | ||
private final DynamicMetricsManager _dynamicMetricsManager; | ||
private final Gauge<Integer> _gauge; | ||
private final Counter _counter; | ||
|
@@ -59,7 +59,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware { | |
*/ | ||
CoordinatorEventBlockingQueue(String key) { | ||
_eventSet = new HashSet<>(); | ||
_eventQueue = new LinkedBlockingQueue<>(); | ||
_eventQueue = new LinkedBlockingDeque<>(); | ||
_dynamicMetricsManager = DynamicMetricsManager.getInstance(); | ||
|
||
String prefix = buildMetricName(key); | ||
|
@@ -73,16 +73,30 @@ class CoordinatorEventBlockingQueue implements MetricsAware { | |
|
||
|
||
/** | ||
* Add a single event to the queue, overwriting events with the same name and same metadata. | ||
* Add a single event to the queue. Defaults to adding the event at the end of the queue. | ||
* @param event CoordinatorEvent event to add to the queue | ||
*/ | ||
public synchronized void put(CoordinatorEvent event) { | ||
LOG.info("Queuing event {} to event queue", event.getType()); | ||
put(event, true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When do we need to support for inserting at rear? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the events to this coordinator queue are inserted in the rear. The only case in which we have to insert in the front is what the PR proposes. |
||
} | ||
|
||
/** | ||
* Add a single event to the queue, de-duping events with the same name and same metadata. | ||
* @param event CoordinatorEvent event to add to the queue | ||
* @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise. | ||
*/ | ||
public synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the public APIs be similar to a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for understanding: LinkedBlockingDeque is already a thread safe and we are using only offer or offerFirst, what is the rational on having this method synchronized? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Minor]: another thing is, we want to have add from both ends but remove should be only from front. However, by using deque we will allow add/remove from both ends, Is there a way we can restrict remove from rear? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The synchronized methods were added long before but I am guessing that the shared set variable might create race and would let in duplicate events in some cases if those methods are not synchronized. That is why the wrapper on top of the LinkedBlockingDeque is implemented which only supports taking out events from the front. |
||
LOG.info("Queuing event {} at the " + (insertInTheEnd ? "end" : "front") + " of the event queue", event.getType()); | ||
if (_eventSet.contains(event)) { | ||
_counter.inc(); // count duplicate event | ||
} else { | ||
// only insert if there isn't an event present in the queue with the same name and same metadata. | ||
boolean result = _eventQueue.offer(event); | ||
boolean result; | ||
if (insertInTheEnd) { | ||
result = _eventQueue.offer(event); | ||
} else { | ||
result = _eventQueue.offerFirst(event); | ||
} | ||
if (!result) { | ||
return; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this interface? Seems like
TestCoordinator.java
has all you need.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didnt wanted to add another constructor in the TestCoordinator.java to override another method of coordinator. With this interface, we could minimize code duplication and pass the overrides of coordinator as an argument.
For the test "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath", I overrode performPreAssignmentCleanup method to test a failure path, where I am using this.