Skip to content

Commit 8a610e6

Browse files
authored
YARN-11614. [Federation] Add Federation PolicyManager Validation Rules. (#6271) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
1 parent abd550c commit 8a610e6

File tree

3 files changed

+47
-7
lines changed

3 files changed

+47
-7
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,4 +822,12 @@ public static boolean isRouterWebProxyEnable(Configuration conf) {
822822
return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
823823
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE);
824824
}
825+
826+
public static boolean checkPolicyManagerValid(String policyManager,
827+
List<String> supportWeightList) throws YarnException {
828+
if (supportWeightList.contains(policyManager)) {
829+
return true;
830+
}
831+
return false;
832+
}
825833
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@
7575
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
7676
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
7777
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
78+
import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
79+
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedHomePolicyManager;
80+
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
7881
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
7982
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
8083
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -92,6 +95,7 @@
9295
import java.io.IOException;
9396
import java.nio.ByteBuffer;
9497
import java.util.List;
98+
import java.util.Arrays;
9599
import java.util.ArrayList;
96100
import java.util.Map;
97101
import java.util.HashMap;
@@ -107,6 +111,8 @@
107111
import java.util.concurrent.ConcurrentHashMap;
108112
import java.util.stream.Collectors;
109113

114+
import static org.apache.hadoop.yarn.server.router.RouterServerUtil.checkPolicyManagerValid;
115+
110116
public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor {
111117

112118
private static final Logger LOG =
@@ -115,6 +121,10 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
115121
private static final String COMMA = ",";
116122
private static final String COLON = ":";
117123

124+
private static final List<String> SUPPORT_WEIGHT_MANAGERS =
125+
new ArrayList<>(Arrays.asList(WeightedLocalityPolicyManager.class.getName(),
126+
PriorityBroadcastPolicyManager.class.getName(), WeightedHomePolicyManager.class.getName()));
127+
118128
private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies;
119129
private FederationStateStoreFacade federationFacade;
120130
private final Clock clock = new MonotonicClock();
@@ -924,6 +934,13 @@ public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
924934
RouterServerUtil.logAndThrowException("Missing Queue information.", null);
925935
}
926936

937+
String policyManagerClassName = request.getPolicyManagerClassName();
938+
if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
939+
routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
940+
RouterServerUtil.logAndThrowException(policyManagerClassName +
941+
" does not support the use of queue weights.", null);
942+
}
943+
927944
String amRmWeight = federationQueueWeight.getAmrmWeight();
928945
FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
929946

@@ -935,9 +952,6 @@ public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
935952

936953
try {
937954
long startTime = clock.getTime();
938-
// Step1, get parameters.
939-
String policyManagerClassName = request.getPolicyManagerClassName();
940-
941955

942956
// Step2, parse amRMPolicyWeights.
943957
Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight);
@@ -1346,6 +1360,12 @@ private void saveFederationQueuePolicy(FederationQueueWeight federationQueueWeig
13461360
RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName information.", null);
13471361
}
13481362

1363+
if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
1364+
routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
1365+
RouterServerUtil.logAndThrowException(policyManagerClassName +
1366+
"does not support the use of queue weights.", null);
1367+
}
1368+
13491369
String amRmWeight = federationQueueWeight.getAmrmWeight();
13501370
FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
13511371

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -643,15 +643,27 @@ public void testSaveFederationQueuePolicyErrorRequest() throws Exception {
643643
LambdaTestUtils.intercept(YarnException.class, "Missing Queue information.",
644644
() -> interceptor.saveFederationQueuePolicy(request));
645645

646-
// routerWeight / amrmWeight
647-
// The sum of the routerWeight is not equal to 1.
646+
// PolicyManager needs to support weight
648647
FederationQueueWeight federationQueueWeight2 = FederationQueueWeight.newInstance(
649648
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
650649
SaveFederationQueuePolicyRequest request2 =
651-
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2, "-");
650+
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2,
651+
"TestPolicyManager");
652652
LambdaTestUtils.intercept(YarnException.class,
653-
"The sum of ratios for all subClusters must be equal to 1.",
653+
"TestPolicyManager does not support the use of queue weights.",
654654
() -> interceptor.saveFederationQueuePolicy(request2));
655+
656+
// routerWeight / amrmWeight
657+
// The sum of the routerWeight is not equal to 1.
658+
String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
659+
FederationQueueWeight federationQueueWeight3 = FederationQueueWeight.newInstance(
660+
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
661+
SaveFederationQueuePolicyRequest request3 =
662+
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight3,
663+
policyTypeName);
664+
LambdaTestUtils.intercept(YarnException.class,
665+
"The sum of ratios for all subClusters must be equal to 1.",
666+
() -> interceptor.saveFederationQueuePolicy(request3));
655667
}
656668

657669
@Test

0 commit comments

Comments
 (0)