Skip to content

Commit 9ee5265

Browse files
committed
YARN-10178: Global Scheduler async thread crash caused by 'Comparison method violates its general contract. Contributed by Andras Gyori (gandras) and Qi Zhu (zhuqi).
1 parent cc84223 commit 9ee5265

File tree

1 file changed

+51
-14
lines changed

1 file changed

+51
-14
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
2626

2727
import java.util.ArrayList;
28+
import java.util.Collection;
2829
import java.util.Collections;
2930
import java.util.Comparator;
3031
import java.util.Iterator;
@@ -95,33 +96,35 @@ public static int compare(double relativeAssigned1, double relativeAssigned2,
9596
/**
9697
* Comparator that both looks at priority and utilization
9798
*/
98-
private class PriorityQueueComparator implements Comparator<CSQueue> {
99+
private class PriorityQueueComparator
100+
implements Comparator<PriorityQueueResourcesForSorting> {
99101

100102
@Override
101-
public int compare(CSQueue q1, CSQueue q2) {
103+
public int compare(PriorityQueueResourcesForSorting q1Sort,
104+
PriorityQueueResourcesForSorting q2Sort) {
102105
String p = partitionToLookAt.get();
103106

104-
int rc = compareQueueAccessToPartition(q1, q2, p);
107+
int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
105108
if (0 != rc) {
106109
return rc;
107110
}
108111

109-
float used1 = q1.getQueueCapacities().getUsedCapacity(p);
110-
float used2 = q2.getQueueCapacities().getUsedCapacity(p);
112+
float used1 = q1Sort.usedCapacity;
113+
float used2 = q2Sort.usedCapacity;
111114
int p1 = 0;
112115
int p2 = 0;
113116
if (respectPriority) {
114-
p1 = q1.getPriority().getPriority();
115-
p2 = q2.getPriority().getPriority();
117+
p1 = q1Sort.queue.getPriority().getPriority();
118+
p2 = q2Sort.queue.getPriority().getPriority();
116119
}
117120

118121
rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2);
119122

120123
// For queue with same used ratio / priority, queue with higher configured
121124
// capacity goes first
122125
if (0 == rc) {
123-
float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p);
124-
float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p);
126+
float abs1 = q1Sort.absoluteCapacity;
127+
float abs2 = q2Sort.absoluteCapacity;
125128
return Float.compare(abs2, abs1);
126129
}
127130

@@ -156,6 +159,29 @@ private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, String partiti
156159
}
157160
}
158161

162+
/**
163+
* A simple storage class to represent a snapshot of a queue.
164+
*/
165+
public static class PriorityQueueResourcesForSorting {
166+
private final float usedCapacity;
167+
private final float absoluteCapacity;
168+
private final CSQueue queue;
169+
170+
PriorityQueueResourcesForSorting(CSQueue queue) {
171+
this.queue = queue;
172+
this.usedCapacity =
173+
queue.getQueueCapacities().
174+
getUsedCapacity(partitionToLookAt.get());
175+
this.absoluteCapacity =
176+
queue.getQueueCapacities().
177+
getAbsoluteCapacity(partitionToLookAt.get());
178+
}
179+
180+
public CSQueue getQueue() {
181+
return queue;
182+
}
183+
}
184+
159185
public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
160186
this.respectPriority = respectPriority;
161187
}
@@ -167,12 +193,23 @@ public void setQueues(List<CSQueue> queues) {
167193

168194
@Override
169195
public Iterator<CSQueue> getAssignmentIterator(String partition) {
170-
// Since partitionToLookAt is a thread local variable, and every time we
171-
// copy and sort queues, so it's safe for multi-threading environment.
196+
// partitionToLookAt is a thread local variable, therefore it is safe to mutate it.
172197
PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
173-
List<CSQueue> sortedQueue = new ArrayList<>(queues);
174-
Collections.sort(sortedQueue, new PriorityQueueComparator());
175-
return sortedQueue.iterator();
198+
199+
// Sort the snapshot of the queues in order to avoid breaking the prerequisites of TimSort.
200+
// See YARN-10178 for details.
201+
List<PriorityQueueResourcesForSorting> queueSnapshots = new ArrayList<>();
202+
for (CSQueue queue : queues) {
203+
queueSnapshots.add(new PriorityQueueResourcesForSorting(queue));
204+
}
205+
Collections.sort(queueSnapshots, new PriorityQueueComparator());
206+
207+
List<CSQueue> sortedQueues = new ArrayList<>();
208+
for (PriorityQueueResourcesForSorting queueSnapshot : queueSnapshots) {
209+
sortedQueues.add(queueSnapshot.queue);
210+
}
211+
212+
return sortedQueues.iterator();
176213
}
177214

178215
@Override

0 commit comments

Comments
 (0)