From c9fe6bbfcd64190166b2ec1fa20fd7c30b66517e Mon Sep 17 00:00:00 2001 From: Tao Yang Date: Wed, 16 Oct 2024 20:25:37 +0800 Subject: [PATCH 1/4] YARN-11736. Enhance MultiNodeLookupPolicy to allow configuration of extended comparators for better usability. --- .../CapacitySchedulerConfiguration.java | 23 +- .../placement/MultiNodePolicySpec.java | 13 +- .../scheduler/placement/MultiNodeSorter.java | 22 +- .../placement/MultiNodeSortingManager.java | 2 +- .../policy/MultiComparatorPolicy.java | 346 ++++++++++++++++++ .../TestCapacitySchedulerMultiNodes.java | 137 ++++++- ...citySchedulerMultiNodesWithPreemption.java | 13 +- .../policy/TestMultiComparatorPolicy.java | 328 +++++++++++++++++ 8 files changed, 847 insertions(+), 37 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/TestMultiComparatorPolicy.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 8d9cf20793014..0eadac37433b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2921,6 +2921,12 @@ private void updateResourceValuesFromConfig(Set resourceTypes, @Private public static final String MULTI_NODE_SORTING_POLICY_NAME = PREFIX + "multi-node-sorting.policy"; + public static final String MULTI_NODE_SORTING_POLICY_CURRENT_NAME = + MULTI_NODE_SORTING_POLICY_NAME + ".current-name"; + + public static final String SORTING_INTERVAL_MS_SUFFIX = + "sorting-interval.ms"; + /** * resource usage based node sorting algorithm. */ @@ -2952,16 +2958,7 @@ public String getMultiNodesSortingAlgorithmPolicy( return null; } - String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT - + policyName.trim() + DOT + "class"); - - if (policyClassName == null || policyClassName.isEmpty()) { - throw new YarnRuntimeException( - policyName.trim() + " Class is not configured or not an instance of " - + MultiNodeLookupPolicy.class.getCanonicalName()); - } - - return normalizePolicyName(policyClassName.trim()); + return policyName; } public boolean isLegacyQueueMode() { @@ -3002,7 +2999,7 @@ public Set getMultiNodePlacementPolicies() { policyClassName = normalizePolicyName(policyClassName.trim()); long policySortingInterval = getLong( MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() - + DOT + "sorting-interval.ms", + + DOT + SORTING_INTERVAL_MS_SUFFIX, DEFAULT_MULTI_NODE_SORTING_INTERVAL); if (policySortingInterval < 0) { throw new YarnRuntimeException( @@ -3010,8 +3007,8 @@ public Set getMultiNodePlacementPolicies() { + " multi-node policy is configured with invalid" + " sorting-interval:" + policySortingInterval); } - set.add( - new MultiNodePolicySpec(policyClassName, policySortingInterval)); + set.add(new MultiNodePolicySpec(str.trim(), policyClassName, + policySortingInterval)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java index 36fc3d42598a2..13d166fb4acf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java @@ -22,14 +22,25 @@ */ public class MultiNodePolicySpec { + private String policyName; private String policyClassName; private long sortingInterval; - public MultiNodePolicySpec(String policyClassName, long timeout) { + public MultiNodePolicySpec(String policyName, String policyClassName, + long timeout) { + this.setPolicyName(policyName); this.setSortingInterval(timeout); this.setPolicyClassName(policyClassName); } + public String getPolicyName() { + return policyName; + } + + public void setPolicyName(String policyName) { + this.policyName = policyName; + } + public long getSortingInterval() { return sortingInterval; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java index 38af12719efa0..5968d76184b51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -29,6 +29,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -63,7 +64,7 @@ public class MultiNodeSorter extends AbstractService { public MultiNodeSorter(RMContext rmContext, MultiNodePolicySpec policy) { - super("MultiNodeLookupPolicy"); + super("MultiNodeLookupPolicy:"+policy.getPolicyName()); this.rmContext = rmContext; this.policySpec = policy; } @@ -74,23 +75,30 @@ public synchronized MultiNodeLookupPolicy getMultiNodeLookupPolicy() { } public void serviceInit(Configuration conf) throws Exception { - LOG.info("Initializing MultiNodeSorter=" + policySpec.getPolicyClassName() + LOG.info("Initializing MultiNodeSorter policyName=" + policySpec.getPolicyName() + + ", policyClassName=" + policySpec.getPolicyClassName() + ", with sorting interval=" + policySpec.getSortingInterval()); - initPolicy(policySpec.getPolicyClassName()); + initPolicy(policySpec); super.serviceInit(conf); } @SuppressWarnings("unchecked") - void initPolicy(String policyName) throws YarnException { + void initPolicy(MultiNodePolicySpec policySpec) throws YarnException { + String policyName = policySpec.getPolicyName(); + String policyClassName = policySpec.getPolicyClassName(); Class policyClass; try { - policyClass = Class.forName(policyName); + policyClass = Class.forName(policyClassName); } catch (ClassNotFoundException e) { throw new YarnException( - "Invalid policy name:" + policyName + e.getMessage()); + "Invalid policy class name:" + policyClassName + e.getMessage()); } + Configuration policyConf = new Configuration(this.getConfig()); + policyConf.set( + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_CURRENT_NAME, + policyName); this.multiNodePolicy = (MultiNodeLookupPolicy) ReflectionUtils - .newInstance(policyClass, null); + .newInstance(policyClass, policyConf); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java index 1acf65ee5986f..177fd7303f4c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -89,7 +89,7 @@ private void createAllPolicies() { MultiNodeSorter mon = new MultiNodeSorter(rmContext, policy); mon.init(conf); mon.start(); - runningMultiNodeSorters.put(policy.getPolicyClassName(), mon); + runningMultiNodeSorters.put(policy.getPolicyName(), mon); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java new file mode 100644 index 0000000000000..0f0deb48dd3b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.policy; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.ConfigurationException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; + +/** + *

+ * This class has the following functionality: + * + *

+ * MultiComparatorPolicy + * - manages some common comparators to help sorting nodes by + * allocated/unallocated/total resource, dominant ratio, etc. + * - holds sorted nodes list based on the of nodes at given time. + * - can be configured with specified comparators. + *

+ */ +public class MultiComparatorPolicy + implements MultiNodeLookupPolicy, Configurable { + + private static final Logger LOG = + LoggerFactory.getLogger(MultiComparatorPolicy.class); + // comparators + private static final DominantResourceCalculator DOMINANT_RC = + new DominantResourceCalculator(); + private static final Map> + COMPARATOR_CALCULATORS = Collections.unmodifiableMap( + new HashMap>() {{ + // for vcores + put(ComparatorKey.ALLOCATED_VCORES, + obj -> obj.getAllocatedResource().getVirtualCores()); + put(ComparatorKey.UNALLOCATED_VCORES, + obj -> obj.getUnallocatedResource().getVirtualCores()); + put(ComparatorKey.TOTAL_VCORES, + obj -> obj.getTotalResource().getVirtualCores()); + // for memory + put(ComparatorKey.ALLOCATED_MEMORY, + obj -> obj.getAllocatedResource().getMemorySize()); + put(ComparatorKey.UNALLOCATED_MEMORY, + obj -> obj.getUnallocatedResource().getMemorySize()); + put(ComparatorKey.TOTAL_MEMORY, + obj -> obj.getTotalResource().getMemorySize()); + // for resource + put(ComparatorKey.ALLOCATED_RESOURCE, + SchedulerNode::getAllocatedResource); + put(ComparatorKey.UNALLOCATED_RESOURCE, + SchedulerNode::getUnallocatedResource); + put(ComparatorKey.TOTAL_RESOURCE, + SchedulerNode::getTotalResource); + // for dominant ratio + put(ComparatorKey.DOMINANT_ALLOCATED_RATIO, obj -> Resources + .ratio(DOMINANT_RC, obj.getAllocatedResource(), + obj.getTotalResource())); + // for node ID + put(ComparatorKey.NODE_ID, SchedulerNode::getNodeID); + }}); + // conf keys and default values + public static final String COMPARATORS_CONF_KEY = "comparators"; + protected static final List DEFAULT_COMPARATORS = Collections + .unmodifiableList(Arrays.asList( + new Comparator(ComparatorKey.DOMINANT_ALLOCATED_RATIO, + OrderDirection.ASC, COMPARATOR_CALCULATORS + .get(ComparatorKey.DOMINANT_ALLOCATED_RATIO)), + new Comparator(ComparatorKey.NODE_ID, OrderDirection.ASC, + COMPARATOR_CALCULATORS.get(ComparatorKey.NODE_ID)))); + + protected Map> nodesPerPartition = new ConcurrentHashMap<>(); + protected List comparators; + private Configuration conf; + + public MultiComparatorPolicy() { + } + + @Override + public void setConf(Configuration conf) { + // init comparators + this.comparators = DEFAULT_COMPARATORS; + if (conf == null) { + return; + } + this.conf = conf; + String policyName = conf.get( + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_CURRENT_NAME); + if (policyName != null && !policyName.isEmpty()) { + String comparatorsConfV = conf.get( + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + COMPARATORS_CONF_KEY); + if (comparatorsConfV != null && !comparatorsConfV.isEmpty()) { + try { + this.comparators = parseComparators(comparatorsConfV); + } catch (ConfigurationException e) { + LOG.error("Error parsing comparators for policy " + policyName + ": " + + comparatorsConfV, e); + } + } + } + LOG.info("Initialized comparators for policy {}: {}", policyName, + this.comparators); + } + + /* + * Parse comparators from comparatorsConfV with format: + * [:],[:],... + * example: + * DOMINANT_ALLOCATED_RATIO,NODE_ID:DESC + */ + private List parseComparators(String comparatorsConfV) throws ConfigurationException { + List comparators = new ArrayList<>(); + + String[] comparatorParts = comparatorsConfV.split(","); + for (String part : comparatorParts) { + String[] keyAndOrder = part.split(":"); + ComparatorKey key; + OrderDirection direction = OrderDirection.ASC; // Default to ASC + + // validate key + try { + key = ComparatorKey.valueOf(keyAndOrder[0].trim()); + } catch (IllegalArgumentException e) { + throw new ConfigurationException("invalid comparator-key: " + keyAndOrder[0]); + } + + // validate order + if (keyAndOrder.length > 1) { + try { + direction = OrderDirection.valueOf(keyAndOrder[1].trim().toUpperCase()); + } catch (IllegalArgumentException e) { + throw new ConfigurationException("invalid order-direction: " + keyAndOrder[1]); + } + } + + // validate calculator + Function calculator = + COMPARATOR_CALCULATORS.get(key); // throws if not found + if (calculator == null) { + throw new ConfigurationException("calculator not found for " + key); + } + + // add comparator + comparators.add(new Comparator(key, direction, calculator)); + } + + // validate not empty + if (comparators.isEmpty()) { + throw new ConfigurationException("no comparators found"); + } + + return comparators; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Iterator getPreferredNodeIterator(Collection nodes, + String partition) { + return getNodesPerPartition(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, + String partition) { + List> lookupNodes = new ArrayList<>(); + for (N node : nodes) { + // stream + List values = this.comparators.stream() + .map(comparator -> comparator.calculator.apply(node)) + .collect(Collectors.toList()); + lookupNodes.add(new LookupNode<>(values, node)); + } + CompositeComparator compositeComparator = + new CompositeComparator<>(this.comparators); + lookupNodes.sort(compositeComparator); + nodesPerPartition.put(partition, Collections.unmodifiableSet( + new LinkedHashSet<>(lookupNodes.stream().map(LookupNode::getNode) + .collect(Collectors.toList())))); + } + + @Override + public Set getNodesPerPartition(String partition) { + return nodesPerPartition.getOrDefault(partition, Collections.emptySet()); + } + + @VisibleForTesting + public List getComparatorKeys() { + return this.comparators.stream().map(Comparator::getKey) + .collect(Collectors.toList()); + } + + @VisibleForTesting + public List getOrderDirections() { + return comparators.stream().map(Comparator::getDirection) + .collect(Collectors.toList()); + } +} + +class Comparator { + protected ComparatorKey key; + OrderDirection direction; + Function calculator; + + public Comparator(ComparatorKey key, OrderDirection direction, + Function calculator) { + this.key = key; + this.direction = direction; + this.calculator = calculator; + } + + public ComparatorKey getKey() { + return key; + } + + public OrderDirection getDirection() { + return direction; + } + + public String toString() { + return key + ":" + direction; + } +} + +/** + * Enum for comparator keys. + */ +enum ComparatorKey { + // for vcores + ALLOCATED_VCORES, + UNALLOCATED_VCORES, + TOTAL_VCORES, + // for memory + ALLOCATED_MEMORY, + UNALLOCATED_MEMORY, + TOTAL_MEMORY, + // for resource + ALLOCATED_RESOURCE, + UNALLOCATED_RESOURCE, + TOTAL_RESOURCE, + // for dominant ratio + DOMINANT_ALLOCATED_RATIO, + // for node ID + NODE_ID, +} + +/** + * Enum for order direction. + */ +enum OrderDirection { + ASC, + DESC, +} + +/** + * LookupNode with pre-prepared comparable values. + */ +class LookupNode { + + protected List comparableValues; + + private N node; + + public LookupNode(List comparableValues, N node) { + this.comparableValues = comparableValues; + this.node = node; + } + + public N getNode() { + return node; + } +} + +/** + * Composite comparator that compares multiple values in order. + */ +class CompositeComparator implements + java.util.Comparator> { + + private List comparators; + + public CompositeComparator(List comparators) { + this.comparators = comparators; + } + + @Override + public int compare(LookupNode o1, LookupNode o2) { + for (int i = 0; i < comparators.size(); i++) { + Comparable o1Value = o1.comparableValues.get(i); + Comparable o2Value = o2.comparableValues.get(i); + int compare = comparators.get(i).direction == OrderDirection.ASC ? + o1Value.compareTo(o2Value) : + o2Value.compareTo(o1Value); + if (compare != 0) { + return compare; + } + } + return 0; + } + + public List getComparators() { + return comparators; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index 484308442ff31..99a14ab7e1098 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.waitforNMRegistered; @@ -29,14 +30,17 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.policy.MultiComparatorPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.NodeId; @@ -66,6 +70,7 @@ public class TestCapacitySchedulerMultiNodes { .getLogger(TestCapacitySchedulerMultiNodes.class); private static final QueuePath DEFAULT = new QueuePath("root.default"); private CapacitySchedulerConfiguration conf; + private static final String POLICY_NAME = "resource-based"; private static final String POLICY_CLASS_NAME = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy"; @@ -79,12 +84,12 @@ public void setUp() { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, - "resource-based"); + POLICY_NAME); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, - "resource-based"); + POLICY_NAME); String policyName = CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME - + ".resource-based" + ".class"; + + DOT + POLICY_NAME + ".class"; conf.set(policyName, POLICY_CLASS_NAME); conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true); @@ -105,7 +110,7 @@ public void testMultiNodeSorterForScheduling() throws Exception { MultiNodeSortingManager mns = rm.getRMContext() .getMultiNodeSortingManager(); MultiNodeSorter sorter = mns - .getMultiNodePolicy(POLICY_CLASS_NAME); + .getMultiNodePolicy(POLICY_NAME); sorter.reSortClusterNodes(); Set nodes = sorter.getMultiNodeLookupPolicy() .getNodesPerPartition(""); @@ -127,7 +132,7 @@ public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception { MultiNodeSortingManager mns = rm.getRMContext() .getMultiNodeSortingManager(); MultiNodeSorter sorter = mns - .getMultiNodePolicy(POLICY_CLASS_NAME); + .getMultiNodePolicy(POLICY_NAME); sorter.reSortClusterNodes(); Set nodes = sorter.getMultiNodeLookupPolicy() @@ -466,18 +471,18 @@ public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception { MultiNodeSortingManager mns = rm.getRMContext() .getMultiNodeSortingManager(); MultiNodeSorter sorter = mns - .getMultiNodePolicy(POLICY_CLASS_NAME); + .getMultiNodePolicy(POLICY_NAME); sorter.reSortClusterNodes(); Iterator nodeIterator = mns.getMultiNodeSortIterator( - nodes, partition, POLICY_CLASS_NAME); + nodes, partition, POLICY_NAME); Assert.assertEquals(4, Iterators.size(nodeIterator)); // Validate the count after missing 3 node heartbeats Thread.sleep(YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS * 3); nodeIterator = mns.getMultiNodeSortIterator( - nodes, partition, POLICY_CLASS_NAME); + nodes, partition, POLICY_NAME); Assert.assertEquals(0, Iterators.size(nodeIterator)); rm.stop(); @@ -564,6 +569,122 @@ public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception { rm1.close(); } + @Test + public void testMultiComparatorPolicy() throws Exception { + /* + * init conf + * - configure 2 policies with EnhancedMultiNodeLookupPolicy class + * default policy: use default comparator(ALLOCATED_RESOURCE,NODE_ID) + * - enable synchronous refresh (set sorting-interval-ms to be 0) + + */ + String defaultPolicyName = "default", + allocatedResourcePolicyName = "allocated-resource-based", + enhancedPolicyClass = MultiComparatorPolicy.class.getName(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + defaultPolicyName + "," + allocatedResourcePolicyName); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + defaultPolicyName + ".class", enhancedPolicyClass); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + defaultPolicyName + DOT + + CapacitySchedulerConfiguration.SORTING_INTERVAL_MS_SUFFIX, "0"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + allocatedResourcePolicyName + DOT + + CapacitySchedulerConfiguration.SORTING_INTERVAL_MS_SUFFIX, "0"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + allocatedResourcePolicyName + ".class", enhancedPolicyClass); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + allocatedResourcePolicyName + DOT + + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + "ALLOCATED_RESOURCE:ASC,NODE_ID"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + defaultPolicyName); + conf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "600000"); + // mock RM and 4 NMs + // nm1, nm2, nm3 have 10 GB memory and 10 vcores each + // nm4 has 100 GB memory and 100 vcores. + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 10 * GB, 10); + MockNM nm2 = rm.registerNode("host2:1234", 10 * GB, 10); + MockNM nm3 = rm.registerNode("host3:1234", 10 * GB, 10); + MockNM nm4 = rm.registerNode("host4:1234", 100 * GB, 100); + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + + // allocate for nodes + Function launchAndRegisterAM = (resource) -> { + try { + MockRMAppSubmissionData data1 = + MockRMAppSubmissionData.Builder.createWithResource( + resource, rm) + .withAppName("app-1") + .withAcls(null) + .withQueue("default") + .withUnmanagedAM(false) + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm, data1); + MockRM.launchAndRegisterAM(app1, rm, nm1); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + }; + // try to assign four containers. + Resource nm1AllocatedResource = Resource.newInstance(GB, 1), + nm2AllocatedResource = Resource.newInstance(2*GB, 2), + nm3AllocatedResource = Resource.newInstance(3*GB, 3), + nm4AllocatedResource = Resource.newInstance(4*GB, 4); + launchAndRegisterAM.apply(nm1AllocatedResource); + launchAndRegisterAM.apply(nm2AllocatedResource); + launchAndRegisterAM.apply(nm3AllocatedResource); + launchAndRegisterAM.apply(nm4AllocatedResource); + // verify that four containers will be allocated sequentially to + // nm1, nm2, nm3, nm4 according to the default policy. + Assert.assertEquals( + rm.getResourceScheduler().getSchedulerNode(nm1.getNodeId()) + .getAllocatedResource(), Resource.newInstance(GB, 1)); + Assert.assertEquals( + rm.getResourceScheduler().getSchedulerNode(nm2.getNodeId()) + .getAllocatedResource(), Resource.newInstance(2 * GB, 2)); + Assert.assertEquals( + rm.getResourceScheduler().getSchedulerNode(nm3.getNodeId()) + .getAllocatedResource(), Resource.newInstance(3 * GB, 3)); + Assert.assertEquals( + rm.getResourceScheduler().getSchedulerNode(nm4.getNodeId()) + .getAllocatedResource(), Resource.newInstance(4 * GB, 4)); + + // for default policy, node4 with least dominant-resource-ratio + // should be chosen at first. + MultiNodeSorter sorter = mns + .getMultiNodePolicy(defaultPolicyName); + sorter.reSortClusterNodes(); + Set nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + Assert.assertEquals(nm4.getNodeId(), nodes.iterator().next().getNodeID()); + + // for allocatedResource policy, node1 with least allocated-resource + // should be chosen at first + sorter = mns + .getMultiNodePolicy(allocatedResourcePolicyName); + sorter.reSortClusterNodes(); + nodes = sorter.getMultiNodeLookupPolicy().getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + Assert.assertEquals(nm1.getNodeId(), nodes.iterator().next().getNodeID()); + + rm.stop(); + } + private static void moveReservation(CapacityScheduler cs, MockRM rm1, MockNM nm1, MockNM nm2, MockAM am1) { RMNode sourceNode = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java index c895b58b2962a..a4533dfb2c025 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java @@ -58,6 +58,7 @@ public class TestCapacitySchedulerMultiNodesWithPreemption { private static final Logger LOG = LoggerFactory.getLogger(TestCapacitySchedulerMultiNodesWithPreemption.class); private CapacitySchedulerConfiguration conf; + private static final String POLICY_NAME = "resource-based"; private static final String POLICY_CLASS_NAME = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement." + "ResourceUsageMultiNodeLookupPolicy"; @@ -72,13 +73,11 @@ public void setUp() { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, - "resource-based"); + POLICY_NAME); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, - "resource-based"); - String policyName = - CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME - + ".resource-based" + ".class"; - conf.set(policyName, POLICY_CLASS_NAME); + POLICY_NAME); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + "." + POLICY_NAME + ".class", POLICY_CLASS_NAME); conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true); // Set this to avoid the AM pending issue @@ -129,7 +128,7 @@ public void testAllocateReservationFromOtherNode() throws Exception { MultiNodeSortingManager mns = rm.getRMContext() .getMultiNodeSortingManager(); MultiNodeSorter sorter = mns - .getMultiNodePolicy(POLICY_CLASS_NAME); + .getMultiNodePolicy(POLICY_NAME); sorter.reSortClusterNodes(); // Step 1: Launch an App in Default Queue which utilizes the entire cluster diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/TestMultiComparatorPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/TestMultiComparatorPolicy.java new file mode 100644 index 0000000000000..7b147363a648a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/TestMultiComparatorPolicy.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.policy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; +import static org.mockito.Mockito.when; + +public class TestMultiComparatorPolicy { + private final int GB = 1024; + + @Test + public void testSetConf() { + MultiComparatorPolicy policy = new MultiComparatorPolicy(); + /* + * use default comparators for null, empty, or invalid conf + */ + // null conf + policy.setConf(null); + Assert.assertSame("use default comparators for null conf", + policy.comparators, MultiComparatorPolicy.DEFAULT_COMPARATORS); + // empty conf + Configuration conf = new Configuration(); + policy.setConf(conf); + Assert.assertSame("use default comparators for empty conf", + policy.comparators, MultiComparatorPolicy.DEFAULT_COMPARATORS); + // conf with current-name of policy but no configured comparators + String policyName = "policy1"; + conf.set( + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_CURRENT_NAME, + policyName); + policy.setConf(conf); + Assert.assertSame("use default comparators for empty conf", + policy.comparators, MultiComparatorPolicy.DEFAULT_COMPARATORS); + // conf with current-name of policy and empty comparators conf + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + ",,,"); + policy.setConf(conf); + Assert.assertSame("use default comparators for empty conf", + policy.comparators, MultiComparatorPolicy.DEFAULT_COMPARATORS); + // conf with current-name of policy and comparators conf with invalid comparator-key + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + "INVALID"); + policy.setConf(conf); + Assert.assertSame("use default comparators for empty conf", + policy.comparators, MultiComparatorPolicy.DEFAULT_COMPARATORS); + // conf with current-name of policy and comparators conf with invalid order-direction + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + "NODE_ID:INVALID"); + policy.setConf(conf); + Assert.assertSame("use default comparators for empty conf", + policy.comparators, MultiComparatorPolicy.DEFAULT_COMPARATORS); + /* + * use configured comparators for valid comparators conf + */ + // conf with current-name of policy and 1 valid comparator + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + "NODE_ID:ASC"); + policy.setConf(conf); + Assert.assertEquals("configured 1 comparator", policy.getComparatorKeys(), + Collections.singletonList(ComparatorKey.NODE_ID)); + Assert.assertEquals("configured 1 comparator", policy.getOrderDirections(), + Collections.singletonList(OrderDirection.ASC)); + // conf with current-name of policy and 2 valid comparators + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + "NODE_ID:ASC,ALLOCATED_RESOURCE:DESC"); + policy.setConf(conf); + Assert.assertEquals("configured 2 comparators", policy.getComparatorKeys(), + Arrays.asList(ComparatorKey.NODE_ID, ComparatorKey.ALLOCATED_RESOURCE)); + Assert.assertEquals("configured 2 comparators", policy.getOrderDirections(), + Arrays.asList(OrderDirection.ASC, OrderDirection.DESC)); + } + + @Test + public void testNodeSortingWithDifferentComparators() { + // init policy & conf + MultiComparatorPolicy policy = + new MultiComparatorPolicy<>(); + String policyName = "policy1", partitionName = "partition1"; + Configuration conf = new Configuration(); + conf.set( + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_CURRENT_NAME, + policyName); + + // Create nodes: node1 ~ node6 + // dominant allocated ratios: + // node1: 60%, node2: 50%, node3: 40%, node4: 40%, node5: 50%, node6: 30% + SchedulerNode node1 = createMockNode("node1", Resource.newInstance(GB, 6), + Resource.newInstance(10 * GB, 10)); + SchedulerNode node2 = + createMockNode("node2", Resource.newInstance(2 * GB, 5), + Resource.newInstance(10 * GB, 10)); + SchedulerNode node3 = + createMockNode("node3", Resource.newInstance(3 * GB, 4), + Resource.newInstance(10 * GB, 10)); + SchedulerNode node4 = + createMockNode("node4", Resource.newInstance(4 * GB, 3), + Resource.newInstance(10 * GB, 10)); + SchedulerNode node5 = + createMockNode("node5", Resource.newInstance(5 * GB, 2), + Resource.newInstance(10 * GB, 10)); + SchedulerNode node6 = + createMockNode("node6", Resource.newInstance(6 * GB, 1), + Resource.newInstance(20 * GB, 20)); + List> nodesCases = + Arrays.asList(Arrays.asList(node1, node2, node3, node4, node5, node6), + Arrays.asList(node6, node5, node4, node3, node2, node1), + Arrays.asList(node5, node1, node6, node3, node4, node2)); + /* + * expected sorted nodes in ascending order + */ + List expectedNodesByID = + Arrays.asList(node1, node2, node3, node4, node5, node6); + List expectedNodesByAllocatedMemory = + Arrays.asList(node1, node2, node3, node4, node5, node6); + List expectedNodesByAllocatedVCores = + Arrays.asList(node6, node5, node4, node3, node2, node1); + List expectedNodesByUnallocatedMemory = + Arrays.asList(node5, node4, node3, node2, node1, node6); + List expectedNodesByUnallocatedVCores = + Arrays.asList(node1, node2, node3, node4, node5, node6); + // expected nodes depend on the second comparator - NODE_ID:ASC + List expectedNodesByTotalResource = + Arrays.asList(node1, node2, node3, node4, node5, node6); + List expectedNodesByDominantResourceRatio = + Arrays.asList(node6, node3, node4, node2, node5, node1); + + // test cases + TestCase[] testCases = new TestCase[] { + // NODE_ID + new TestCase("NODE_ID", nodesCases, expectedNodesByID), + new TestCase("NODE_ID:ASC", nodesCases, expectedNodesByID), + new TestCase("NODE_ID:DESC", nodesCases, reverse(expectedNodesByID)), + // ALLOCATED_MEMORY + new TestCase("ALLOCATED_MEMORY:ASC", nodesCases, + expectedNodesByAllocatedMemory), + new TestCase("ALLOCATED_MEMORY:DESC", nodesCases, + reverse(expectedNodesByAllocatedMemory)), + // ALLOCATED_VCORES + new TestCase("ALLOCATED_VCORES:ASC", nodesCases, + expectedNodesByAllocatedVCores), + new TestCase("ALLOCATED_VCORES:DESC", nodesCases, + reverse(expectedNodesByAllocatedVCores)), + // ALLOCATED_RESOURCE + new TestCase("ALLOCATED_RESOURCE:ASC", nodesCases, + expectedNodesByAllocatedMemory), + new TestCase("ALLOCATED_RESOURCE:DESC", nodesCases, + reverse(expectedNodesByAllocatedMemory)), + // UNALLOCATED_MEMORY + new TestCase("UNALLOCATED_MEMORY:ASC", nodesCases, + expectedNodesByUnallocatedMemory), + new TestCase("UNALLOCATED_MEMORY:DESC", nodesCases, + reverse(expectedNodesByUnallocatedMemory)), + // UNALLOCATED_VCORES + new TestCase("UNALLOCATED_VCORES:ASC", nodesCases, + expectedNodesByUnallocatedVCores), + new TestCase("UNALLOCATED_VCORES:DESC", nodesCases, + reverse(expectedNodesByUnallocatedVCores)), + // UNALLOCATED_RESOURCE + new TestCase("UNALLOCATED_RESOURCE:ASC", nodesCases, + expectedNodesByUnallocatedMemory), + new TestCase("UNALLOCATED_RESOURCE:DESC", nodesCases, + reverse(expectedNodesByUnallocatedMemory)), + // TOTAL_MEMORY + new TestCase("TOTAL_MEMORY:ASC,NODE_ID:ASC", nodesCases, + expectedNodesByTotalResource), + new TestCase("TOTAL_MEMORY:DESC,NODE_ID:DESC", nodesCases, + reverse(expectedNodesByTotalResource)), + // TOTAL_VCORES + new TestCase("TOTAL_VCORES:ASC,NODE_ID:ASC", nodesCases, + expectedNodesByTotalResource), + new TestCase("TOTAL_VCORES:DESC,NODE_ID:DESC", nodesCases, + reverse(expectedNodesByTotalResource)), + // TOTAL_RESOURCE + new TestCase("TOTAL_RESOURCE:ASC,NODE_ID:ASC", nodesCases, + expectedNodesByTotalResource), + new TestCase("TOTAL_RESOURCE:DESC,NODE_ID:DESC", nodesCases, + reverse(expectedNodesByTotalResource)), + // DOMINANT_ALLOCATED_RATIO + NODE_ID + new TestCase("DOMINANT_ALLOCATED_RATIO:ASC,NODE_ID:ASC", nodesCases, + expectedNodesByDominantResourceRatio), + new TestCase("DOMINANT_ALLOCATED_RATIO:DESC,NODE_ID:DESC", nodesCases, + reverse(expectedNodesByDominantResourceRatio)) }; + + for (TestCase testCase : testCases) { + conf.set( + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + testCase.comparatorsConf); + policy.setConf(conf); + + for (List nodes : testCase.nodes) { + policy.addAndRefreshNodesSet(nodes, partitionName); + List sortedNodes = + new ArrayList<>(policy.getNodesPerPartition(partitionName)); + assertNodes("Case: comparatorsConf=" + testCase.comparatorsConf, + testCase.expectedNodes, sortedNodes); + } + } + } + + @Test + public void testNodeSortingWithMultiplePartitions() { + // init policy & conf + MultiComparatorPolicy policy = + new MultiComparatorPolicy<>(); + String policyName = "policy1", partition1Name = "partition1", + partition2Name = "partition2"; + Configuration conf = new Configuration(); + conf.set( + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_CURRENT_NAME, + policyName); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName + DOT + MultiComparatorPolicy.COMPARATORS_CONF_KEY, + "NODE_ID:ASC"); + policy.setConf(conf); + + // Create nodes: node1 ~ node5 + SchedulerNode node1 = createMockNode("node1", Resource.newInstance(GB, 5), + Resource.newInstance(5 * GB, 5)); + SchedulerNode node2 = + createMockNode("node2", Resource.newInstance(2 * GB, 4), + Resource.newInstance(5 * GB, 5)); + SchedulerNode node3 = + createMockNode("node3", Resource.newInstance(3 * GB, 3), + Resource.newInstance(5 * GB, 5)); + SchedulerNode node4 = + createMockNode("node4", Resource.newInstance(4 * GB, 2), + Resource.newInstance(5 * GB, 5)); + // add and refresh nodes for partitions + // partition1: node1, node2 + // partition2: node3, node4 + policy.addAndRefreshNodesSet(Arrays.asList(node1, node2), partition1Name); + policy.addAndRefreshNodesSet(Arrays.asList(node4, node3), partition2Name); + + // verify sorted nodes for partition1 + List partition1SortedNodes = + new ArrayList<>(policy.getNodesPerPartition(partition1Name)); + assertNodes("Case: partition=" + partition1Name, + Arrays.asList(node1, node2), partition1SortedNodes); + + // verify sorted nodes for partition2 + List partition2SortedNodes = + new ArrayList<>(policy.getNodesPerPartition(partition2Name)); + assertNodes("Case: partition=" + partition2Name, + Arrays.asList(node3, node4), partition2SortedNodes); + } + + private SchedulerNode createMockNode(String nodeId, + Resource allocatedResource, Resource totalResource) { + SchedulerNode node = Mockito.mock(SchedulerNode.class); + when(node.getNodeID()).thenReturn(NodeId.newInstance(nodeId, 0)); + when(node.getAllocatedResource()).thenReturn(allocatedResource); + when(node.getTotalResource()).thenReturn(totalResource); + when(node.getUnallocatedResource()).thenReturn( + Resources.subtract(totalResource, allocatedResource)); + return node; + } + + private void assertNodes(String message, + List expectedSortedNodes, + List actualNodes) { + Assert.assertEquals(message, expectedSortedNodes.size(), + actualNodes.size()); + List nodeIds = actualNodes.stream().map(SchedulerNode::getNodeID) + .collect(Collectors.toList()); + List expectedIds = + expectedSortedNodes.stream().map(SchedulerNode::getNodeID) + .collect(Collectors.toList()); + Assert.assertEquals(message, expectedIds, nodeIds); + } + + private List reverse(List nodes) { + List reversedNodes = new ArrayList<>(nodes); + Collections.reverse(reversedNodes); + return reversedNodes; + } + + private static class TestCase { + String comparatorsConf; + List> nodes; + List expectedNodes; + + TestCase(String comparatorsConf, List> nodes, + List expectedNodes) { + this.comparatorsConf = comparatorsConf; + this.nodes = nodes; + this.expectedNodes = expectedNodes; + } + } +} From c0748d7fde4b36b0c5a10f6ba11ecd366aa7bd38 Mon Sep 17 00:00:00 2001 From: Tao Yang Date: Thu, 17 Oct 2024 16:00:41 +0800 Subject: [PATCH 2/4] Refactor variable names after supporting multiple policy instances with the same policy class and improve UT. --- .../scheduler/capacity/AbstractCSQueue.java | 12 ++-- .../scheduler/capacity/CSQueue.java | 2 +- .../CapacitySchedulerConfiguration.java | 13 ++-- .../common/ApplicationSchedulingConfig.java | 4 +- .../common/fica/FiCaSchedulerApp.java | 12 ++-- .../placement/AppPlacementAllocator.java | 2 +- .../TestCapacitySchedulerMultiNodes.java | 65 +++++++++++++------ 7 files changed, 67 insertions(+), 43 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index aaab713112beb..1bdaebb3d89e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -98,7 +98,7 @@ public abstract class AbstractCSQueue implements CSQueue { final ResourceCalculator resourceCalculator; Set resourceTypes; final RMNodeLabelsManager labelManager; - private String multiNodeSortingPolicyClassName = null; + private String multiNodeSortingPolicyName = null; Map acls = new HashMap(); @@ -423,7 +423,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws getQueuePathObject()); // Update multi-node sorting algorithm for scheduling as configured. - setMultiNodeSortingPolicyClassName( + setMultiNodeSortingPolicyName( configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePathObject())); // Setup application related limits @@ -1197,12 +1197,12 @@ public void recoverDrainingState() { } @Override - public String getMultiNodeSortingPolicyClassName() { - return this.multiNodeSortingPolicyClassName; + public String getMultiNodeSortingPolicyName() { + return this.multiNodeSortingPolicyName; } - public void setMultiNodeSortingPolicyClassName(String policyName) { - this.multiNodeSortingPolicyClassName = policyName; + public void setMultiNodeSortingPolicyName(String policyName) { + this.multiNodeSortingPolicyName = policyName; } public long getMaximumApplicationLifetime() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index df3199220b286..a8ee15303f8ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -535,7 +535,7 @@ public void validateSubmitApplication(ApplicationId applicationId, * Get Multi Node scheduling policy name. * @return policy name */ - String getMultiNodeSortingPolicyClassName(); + String getMultiNodeSortingPolicyName(); /** * Get the maximum lifetime in seconds of an application which is submitted to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 0eadac37433b3..4906bf485363d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2915,11 +2915,14 @@ private void updateResourceValuesFromConfig(Set resourceTypes, } } + public static final String MULTI_NODE_SORTING_POLICY_SUFFIX = + "multi-node-sorting.policy"; + @Private public static final String MULTI_NODE_SORTING_POLICIES = PREFIX + "multi-node-sorting.policy.names"; @Private public static final String MULTI_NODE_SORTING_POLICY_NAME = - PREFIX + "multi-node-sorting.policy"; + PREFIX + MULTI_NODE_SORTING_POLICY_SUFFIX; public static final String MULTI_NODE_SORTING_POLICY_CURRENT_NAME = MULTI_NODE_SORTING_POLICY_NAME + ".current-name"; @@ -2946,18 +2949,12 @@ public String getMultiNodesSortingAlgorithmPolicy( QueuePath queue) { String policyName = get( - getQueuePrefix(queue) + "multi-node-sorting.policy"); + getQueuePrefix(queue) + MULTI_NODE_SORTING_POLICY_SUFFIX); if (policyName == null) { policyName = get(MULTI_NODE_SORTING_POLICY_NAME); } - // If node sorting policy is not configured in queue and in cluster level, - // it is been assumed that this queue is not enabled with multi-node lookup. - if (policyName == null || policyName.isEmpty()) { - return null; - } - return policyName; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java index 06f74de96bce6..05e8e3fb3914e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java @@ -34,6 +34,6 @@ public class ApplicationSchedulingConfig { DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; @InterfaceAudience.Private - public static final String ENV_MULTI_NODE_SORTING_POLICY_CLASS = - "MULTI_NODE_SORTING_POLICY_CLASS"; + public static final String ENV_MULTI_NODE_SORTING_POLICY_NAME = + "MULTI_NODE_SORTING_POLICY_NAME"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f050dc3ebc154..011335cc64237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -187,17 +187,17 @@ private void updateMultiNodeSortingPolicy(RMApp rmApp) { return; } - String policyClassName = null; + String policyName = null; if (scheduler instanceof CapacityScheduler) { - policyClassName = getCSLeafQueue().getMultiNodeSortingPolicyClassName(); + policyName = getCSLeafQueue().getMultiNodeSortingPolicyName(); } if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey( - ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS) - && policyClassName != null) { + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_NAME) + && policyName != null) { appSchedulingInfo.getApplicationSchedulingEnvs().put( - ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS, - policyClassName); + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_NAME, + policyName); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index 20b54298c36b2..803b13c643744 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -227,7 +227,7 @@ public void initialize(AppSchedulingInfo appSchedulingInfo, this.schedulerRequestKey = schedulerRequestKey; multiNodeSortPolicyName = appSchedulingInfo .getApplicationSchedulingEnvs().get( - ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS); + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_NAME); multiNodeSortingManager = (MultiNodeSortingManager) rmContext .getMultiNodeSortingManager(); if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index 99a14ab7e1098..045e5d82edb1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ROOT; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.waitforNMRegistered; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getQueuePrefix; import java.util.ArrayList; import java.util.HashSet; @@ -30,7 +32,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; +import java.util.function.BiFunction; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators; @@ -41,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.policy.MultiComparatorPolicy; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.NodeId; @@ -573,15 +576,26 @@ public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception { public void testMultiComparatorPolicy() throws Exception { /* * init conf - * - configure 2 policies with EnhancedMultiNodeLookupPolicy class - * default policy: use default comparator(ALLOCATED_RESOURCE,NODE_ID) + * - configure 2 policies with MultiComparatorPolicy class + * default: use default comparator + * (DOMINANT_RESOURCE_RATIO:ASC,NODE_ID:ASC) + * test: use custom comparator (ALLOCATED_RESOURCE:ASC,NODE_ID:ASC) * - enable synchronous refresh (set sorting-interval-ms to be 0) - + * - configure queue "test" to use test policy. */ - String defaultPolicyName = "default", - allocatedResourcePolicyName = "allocated-resource-based", + String defaultQueueName = "default", defaultPolicyName = "default", + testQueueName = "test", testPolicyName = "test", enhancedPolicyClass = MultiComparatorPolicy.class.getName(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + // init queues + conf.setQueues(ROOT, new String[]{defaultQueueName, testQueueName}); + QueuePath defaultQueuePath = + QueuePath.createFromQueues(ROOT.getFullPath(), defaultQueueName); + QueuePath testQueuePath = + QueuePath.createFromQueues(ROOT.getFullPath(), testQueueName); + conf.setCapacity(defaultQueuePath, 50.0f); + conf.setCapacity(testQueuePath, 50.0f); + conf.setMaximumApplicationMasterResourcePercent(1.0f); conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, DominantResourceCalculator.class.getName()); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, @@ -589,23 +603,26 @@ public void testMultiComparatorPolicy() throws Exception { conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, - defaultPolicyName + "," + allocatedResourcePolicyName); + defaultPolicyName + "," + testPolicyName); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + defaultPolicyName + ".class", enhancedPolicyClass); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT + defaultPolicyName + DOT + CapacitySchedulerConfiguration.SORTING_INTERVAL_MS_SUFFIX, "0"); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT - + allocatedResourcePolicyName + DOT + + testPolicyName + DOT + CapacitySchedulerConfiguration.SORTING_INTERVAL_MS_SUFFIX, "0"); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT - + allocatedResourcePolicyName + ".class", enhancedPolicyClass); + + testPolicyName + ".class", enhancedPolicyClass); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + DOT - + allocatedResourcePolicyName + DOT + + testPolicyName + DOT + MultiComparatorPolicy.COMPARATORS_CONF_KEY, "ALLOCATED_RESOURCE:ASC,NODE_ID"); conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, defaultPolicyName); + conf.set(getQueuePrefix(testQueuePath) + + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_SUFFIX, + testPolicyName); conf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "600000"); // mock RM and 4 NMs // nm1, nm2, nm3 have 10 GB memory and 10 vcores each @@ -622,14 +639,14 @@ public void testMultiComparatorPolicy() throws Exception { .getMultiNodeSortingManager(); // allocate for nodes - Function launchAndRegisterAM = (resource) -> { + BiFunction launchAndRegisterAM = (queue, resource) -> { try { MockRMAppSubmissionData data1 = MockRMAppSubmissionData.Builder.createWithResource( resource, rm) .withAppName("app-1") .withAcls(null) - .withQueue("default") + .withQueue(queue) .withUnmanagedAM(false) .build(); RMApp app1 = MockRMAppSubmitter.submit(rm, data1); @@ -644,10 +661,10 @@ public void testMultiComparatorPolicy() throws Exception { nm2AllocatedResource = Resource.newInstance(2*GB, 2), nm3AllocatedResource = Resource.newInstance(3*GB, 3), nm4AllocatedResource = Resource.newInstance(4*GB, 4); - launchAndRegisterAM.apply(nm1AllocatedResource); - launchAndRegisterAM.apply(nm2AllocatedResource); - launchAndRegisterAM.apply(nm3AllocatedResource); - launchAndRegisterAM.apply(nm4AllocatedResource); + launchAndRegisterAM.apply(defaultQueueName, nm1AllocatedResource); + launchAndRegisterAM.apply(defaultQueueName, nm2AllocatedResource); + launchAndRegisterAM.apply(defaultQueueName, nm3AllocatedResource); + launchAndRegisterAM.apply(defaultQueueName, nm4AllocatedResource); // verify that four containers will be allocated sequentially to // nm1, nm2, nm3, nm4 according to the default policy. Assert.assertEquals( @@ -663,7 +680,7 @@ public void testMultiComparatorPolicy() throws Exception { rm.getResourceScheduler().getSchedulerNode(nm4.getNodeId()) .getAllocatedResource(), Resource.newInstance(4 * GB, 4)); - // for default policy, node4 with least dominant-resource-ratio + // for default policy, nm4 with least dominant-resource-ratio // should be chosen at first. MultiNodeSorter sorter = mns .getMultiNodePolicy(defaultPolicyName); @@ -673,15 +690,25 @@ public void testMultiComparatorPolicy() throws Exception { Assert.assertEquals(4, nodes.size()); Assert.assertEquals(nm4.getNodeId(), nodes.iterator().next().getNodeID()); - // for allocatedResource policy, node1 with least allocated-resource + // for test policy, nm1 with least allocated-resource // should be chosen at first sorter = mns - .getMultiNodePolicy(allocatedResourcePolicyName); + .getMultiNodePolicy(testPolicyName); sorter.reSortClusterNodes(); nodes = sorter.getMultiNodeLookupPolicy().getNodesPerPartition(""); Assert.assertEquals(4, nodes.size()); Assert.assertEquals(nm1.getNodeId(), nodes.iterator().next().getNodeID()); + // schedule for app in test queue with policy=test, + // verify that nm1 will be chosen + Resource nm1AddResource = Resource.newInstance(6 * GB, 4); + launchAndRegisterAM.apply(testQueuePath.getLeafName(), nm1AddResource); + Resource expectedAllocatedResourceForNM1 = + Resources.add(nm1AllocatedResource, nm1AddResource); + Assert.assertEquals( + rm.getResourceScheduler().getSchedulerNode(nm1.getNodeId()) + .getAllocatedResource(), expectedAllocatedResourceForNM1); + rm.stop(); } From 95ef881da9f990132dd30eb4396dae2ceeff11e9 Mon Sep 17 00:00:00 2001 From: Tao Yang Date: Fri, 18 Oct 2024 21:08:10 +0800 Subject: [PATCH 3/4] Add javadoc for important fields. --- .../CapacitySchedulerConfiguration.java | 15 +++++++++++++++ .../placement/MultiNodePolicySpec.java | 3 ++- .../scheduler/placement/MultiNodeSorter.java | 6 ++---- .../placement/policy/MultiComparatorPolicy.java | 17 ++++++++++++++++- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 4906bf485363d..a5df2ca07ab9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2915,6 +2915,15 @@ private void updateResourceValuesFromConfig(Set resourceTypes, } } + /** + * Common suffix for the MultiNodeSortingPolicy configuration item. + * For example, you can use the following configuration item to + * set the default policy: + * yarn.scheduler.capacity.multi-node-sorting.policy= + * You can also use the following configuration items to set + * specific policies for individual queues: + * yarn.scheduler.capacity..multi-node-sorting.policy= + */ public static final String MULTI_NODE_SORTING_POLICY_SUFFIX = "multi-node-sorting.policy"; @@ -2924,6 +2933,12 @@ private void updateResourceValuesFromConfig(Set resourceTypes, @Private public static final String MULTI_NODE_SORTING_POLICY_NAME = PREFIX + MULTI_NODE_SORTING_POLICY_SUFFIX; + /** + * Configuration key for the current policy name of the MultiNodeSortingPolicy + * instance. This is an instance-level configuration used to pass the + * policyName to the instance, allowing the instance to retrieve the + * corresponding configuration. + */ public static final String MULTI_NODE_SORTING_POLICY_CURRENT_NAME = MULTI_NODE_SORTING_POLICY_NAME + ".current-name"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java index 13d166fb4acf7..5fe1b50777b73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java @@ -60,7 +60,8 @@ public void setPolicyClassName(String policyClassName) { @Override public String toString() { return "MultiNodePolicySpec {" + - "policyClassName='" + policyClassName + '\'' + + "policyName='" + policyName + '\'' + + ", policyClassName='" + policyClassName + '\'' + ", sortingInterval=" + sortingInterval + '}'; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java index 5968d76184b51..2d81657d0788e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -64,7 +64,7 @@ public class MultiNodeSorter extends AbstractService { public MultiNodeSorter(RMContext rmContext, MultiNodePolicySpec policy) { - super("MultiNodeLookupPolicy:"+policy.getPolicyName()); + super("MultiNodeLookupPolicy: " + policy.getPolicyName()); this.rmContext = rmContext; this.policySpec = policy; } @@ -75,9 +75,7 @@ public synchronized MultiNodeLookupPolicy getMultiNodeLookupPolicy() { } public void serviceInit(Configuration conf) throws Exception { - LOG.info("Initializing MultiNodeSorter policyName=" + policySpec.getPolicyName() - + ", policyClassName=" + policySpec.getPolicyClassName() - + ", with sorting interval=" + policySpec.getSortingInterval()); + LOG.info("Initializing MultiNodeSorter with {}", policySpec); initPolicy(policySpec); super.serviceInit(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java index 0f0deb48dd3b3..9ff50fe6966a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java @@ -97,8 +97,23 @@ public class MultiComparatorPolicy // for node ID put(ComparatorKey.NODE_ID, SchedulerNode::getNodeID); }}); - // conf keys and default values + + /* + * Configuration key for specifying comparators in a MultiComparatorPolicy instance. + * Use this key to define comparators for a policy instance as follows: + * yarn.scheduler.capacity.multi-node-sorting-policy..comparators= + * The value should be a comma-separated list of comparator keys with optional + * order directions (ASC by default). + * Example: DOMINANT_ALLOCATED_RATIO,NODE_ID:DESC + */ public static final String COMPARATORS_CONF_KEY = "comparators"; + + /* + * Default comparators for MultiComparatorPolicy: + * DOMINANT_ALLOCATED_RATIO:ASC,NODE_ID:ASC, + * The default comparators are used when no comparators or invalid comparators + * are specified in the configuration. + */ protected static final List DEFAULT_COMPARATORS = Collections .unmodifiableList(Arrays.asList( new Comparator(ComparatorKey.DOMINANT_ALLOCATED_RATIO, From 938e0d6cca4ddb04291741595d971ee1ff3058c0 Mon Sep 17 00:00:00 2001 From: Tao Yang Date: Thu, 24 Oct 2024 09:19:21 +0800 Subject: [PATCH 4/4] Clear useless comment and improve list construction. --- .../scheduler/placement/policy/MultiComparatorPolicy.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java index 9ff50fe6966a9..2dbf04e2d1c22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/policy/MultiComparatorPolicy.java @@ -220,9 +220,8 @@ public Iterator getPreferredNodeIterator(Collection nodes, @Override public void addAndRefreshNodesSet(Collection nodes, String partition) { - List> lookupNodes = new ArrayList<>(); + List> lookupNodes = new ArrayList<>(nodes.size()); for (N node : nodes) { - // stream List values = this.comparators.stream() .map(comparator -> comparator.calculator.apply(node)) .collect(Collectors.toList());