Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11736. Enhance MultiNodeLookupPolicy to allow configuration of extended comparators for better usability. #7121

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public abstract class AbstractCSQueue implements CSQueue {
final ResourceCalculator resourceCalculator;
Set<String> resourceTypes;
final RMNodeLabelsManager labelManager;
private String multiNodeSortingPolicyClassName = null;
private String multiNodeSortingPolicyName = null;

Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
Expand Down Expand Up @@ -423,7 +423,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws
getQueuePathObject());

// Update multi-node sorting algorithm for scheduling as configured.
setMultiNodeSortingPolicyClassName(
setMultiNodeSortingPolicyName(
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePathObject()));

// Setup application related limits
Expand Down Expand Up @@ -1197,12 +1197,12 @@ public void recoverDrainingState() {
}

@Override
public String getMultiNodeSortingPolicyClassName() {
return this.multiNodeSortingPolicyClassName;
public String getMultiNodeSortingPolicyName() {
return this.multiNodeSortingPolicyName;
}

public void setMultiNodeSortingPolicyClassName(String policyName) {
this.multiNodeSortingPolicyClassName = policyName;
public void setMultiNodeSortingPolicyName(String policyName) {
this.multiNodeSortingPolicyName = policyName;
}

public long getMaximumApplicationLifetime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public void validateSubmitApplication(ApplicationId applicationId,
* Get Multi Node scheduling policy name.
* @return policy name
*/
String getMultiNodeSortingPolicyClassName();
String getMultiNodeSortingPolicyName();

/**
* Get the maximum lifetime in seconds of an application which is submitted to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2915,11 +2915,20 @@ private void updateResourceValuesFromConfig(Set<String> resourceTypes,
}
}

public static final String MULTI_NODE_SORTING_POLICY_SUFFIX =
TaoYang526 marked this conversation as resolved.
Show resolved Hide resolved
"multi-node-sorting.policy";

@Private public static final String MULTI_NODE_SORTING_POLICIES =
PREFIX + "multi-node-sorting.policy.names";

@Private public static final String MULTI_NODE_SORTING_POLICY_NAME =
PREFIX + "multi-node-sorting.policy";
PREFIX + MULTI_NODE_SORTING_POLICY_SUFFIX;

public static final String MULTI_NODE_SORTING_POLICY_CURRENT_NAME =
MULTI_NODE_SORTING_POLICY_NAME + ".current-name";
TaoYang526 marked this conversation as resolved.
Show resolved Hide resolved

public static final String SORTING_INTERVAL_MS_SUFFIX =
"sorting-interval.ms";

/**
* resource usage based node sorting algorithm.
Expand All @@ -2940,28 +2949,13 @@ public String getMultiNodesSortingAlgorithmPolicy(
QueuePath queue) {

String policyName = get(
getQueuePrefix(queue) + "multi-node-sorting.policy");
getQueuePrefix(queue) + MULTI_NODE_SORTING_POLICY_SUFFIX);

if (policyName == null) {
policyName = get(MULTI_NODE_SORTING_POLICY_NAME);
}

// If node sorting policy is not configured in queue and in cluster level,
// it is been assumed that this queue is not enabled with multi-node lookup.
if (policyName == null || policyName.isEmpty()) {
return null;
}

String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT
+ policyName.trim() + DOT + "class");

if (policyClassName == null || policyClassName.isEmpty()) {
throw new YarnRuntimeException(
policyName.trim() + " Class is not configured or not an instance of "
+ MultiNodeLookupPolicy.class.getCanonicalName());
}

return normalizePolicyName(policyClassName.trim());
return policyName;
}

public boolean isLegacyQueueMode() {
Expand Down Expand Up @@ -3002,16 +2996,16 @@ public Set<MultiNodePolicySpec> getMultiNodePlacementPolicies() {
policyClassName = normalizePolicyName(policyClassName.trim());
long policySortingInterval = getLong(
MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim()
+ DOT + "sorting-interval.ms",
+ DOT + SORTING_INTERVAL_MS_SUFFIX,
DEFAULT_MULTI_NODE_SORTING_INTERVAL);
if (policySortingInterval < 0) {
throw new YarnRuntimeException(
str.trim()
+ " multi-node policy is configured with invalid"
+ " sorting-interval:" + policySortingInterval);
}
set.add(
new MultiNodePolicySpec(policyClassName, policySortingInterval));
set.add(new MultiNodePolicySpec(str.trim(), policyClassName,
policySortingInterval));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public class ApplicationSchedulingConfig {
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class;

@InterfaceAudience.Private
public static final String ENV_MULTI_NODE_SORTING_POLICY_CLASS =
"MULTI_NODE_SORTING_POLICY_CLASS";
public static final String ENV_MULTI_NODE_SORTING_POLICY_NAME =
"MULTI_NODE_SORTING_POLICY_NAME";
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,17 @@ private void updateMultiNodeSortingPolicy(RMApp rmApp) {
return;
}

String policyClassName = null;
String policyName = null;
if (scheduler instanceof CapacityScheduler) {
policyClassName = getCSLeafQueue().getMultiNodeSortingPolicyClassName();
policyName = getCSLeafQueue().getMultiNodeSortingPolicyName();
}

if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS)
&& policyClassName != null) {
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_NAME)
&& policyName != null) {
appSchedulingInfo.getApplicationSchedulingEnvs().put(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS,
policyClassName);
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_NAME,
policyName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void initialize(AppSchedulingInfo appSchedulingInfo,
this.schedulerRequestKey = schedulerRequestKey;
multiNodeSortPolicyName = appSchedulingInfo
.getApplicationSchedulingEnvs().get(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_NAME);
multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
.getMultiNodeSortingManager();
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,25 @@
*/
public class MultiNodePolicySpec {

private String policyName;
private String policyClassName;
private long sortingInterval;

public MultiNodePolicySpec(String policyClassName, long timeout) {
public MultiNodePolicySpec(String policyName, String policyClassName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why both policyName and policyClassName is required here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MultiNodeSorter#initPolicy, Policy instance will be created based on policyClassName, and policyName will be used to get conf belong to this policy instance in MultiComparatorPolicy#setConf. This is the only way for every policy instance to know which configuration belong to it, another way is to update the policy interface that I prefer not to use. If there are better approaches, feel free to propose them.

long timeout) {
this.setPolicyName(policyName);
this.setSortingInterval(timeout);
this.setPolicyClassName(policyClassName);
}

public String getPolicyName() {
return policyName;
}

public void setPolicyName(String policyName) {
this.policyName = policyName;
}

public long getSortingInterval() {
return sortingInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -63,7 +64,7 @@ public class MultiNodeSorter<N extends SchedulerNode> extends AbstractService {

public MultiNodeSorter(RMContext rmContext,
MultiNodePolicySpec policy) {
super("MultiNodeLookupPolicy");
super("MultiNodeLookupPolicy:"+policy.getPolicyName());
TaoYang526 marked this conversation as resolved.
Show resolved Hide resolved
this.rmContext = rmContext;
this.policySpec = policy;
}
Expand All @@ -74,23 +75,30 @@ public synchronized MultiNodeLookupPolicy<N> getMultiNodeLookupPolicy() {
}

public void serviceInit(Configuration conf) throws Exception {
LOG.info("Initializing MultiNodeSorter=" + policySpec.getPolicyClassName()
LOG.info("Initializing MultiNodeSorter policyName=" + policySpec.getPolicyName()
+ ", policyClassName=" + policySpec.getPolicyClassName()
+ ", with sorting interval=" + policySpec.getSortingInterval());
initPolicy(policySpec.getPolicyClassName());
initPolicy(policySpec);
super.serviceInit(conf);
}

@SuppressWarnings("unchecked")
void initPolicy(String policyName) throws YarnException {
void initPolicy(MultiNodePolicySpec policySpec) throws YarnException {
String policyName = policySpec.getPolicyName();
String policyClassName = policySpec.getPolicyClassName();
Class<?> policyClass;
try {
policyClass = Class.forName(policyName);
policyClass = Class.forName(policyClassName);
} catch (ClassNotFoundException e) {
throw new YarnException(
"Invalid policy name:" + policyName + e.getMessage());
"Invalid policy class name:" + policyClassName + e.getMessage());
}
Configuration policyConf = new Configuration(this.getConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse config object instead of creating new one ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MultiNodeSortingManager#createAllPolicies, we can see all the MultiNodeSorter instances owns a shared config, policyName will be set in policyConf, which is a instance-level configuration, so that policyInstance can get the configurations belong to itself.

policyConf.set(
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_CURRENT_NAME,
policyName);
this.multiNodePolicy = (MultiNodeLookupPolicy<N>) ReflectionUtils
.newInstance(policyClass, null);
.newInstance(policyClass, policyConf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void createAllPolicies() {
MultiNodeSorter<N> mon = new MultiNodeSorter<N>(rmContext, policy);
mon.init(conf);
mon.start();
runningMultiNodeSorters.put(policy.getPolicyClassName(), mon);
runningMultiNodeSorters.put(policy.getPolicyName(), mon);
}
}

Expand Down
Loading
Loading