Skip to content

Commit f8b9dd9

Browse files
authored
YARN-11219. [Federation] Add getAppActivities, getAppStatistics REST APIs for Router. (#4757)
1 parent 5736b34 commit f8b9dd9

File tree

7 files changed

+383
-4
lines changed

7 files changed

+383
-4
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ApplicationStatisticsInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
1919

2020
import java.util.ArrayList;
21+
import java.util.Collection;
2122

2223
import javax.xml.bind.annotation.XmlAccessType;
2324
import javax.xml.bind.annotation.XmlAccessorType;
@@ -33,6 +34,10 @@ public class ApplicationStatisticsInfo {
3334
public ApplicationStatisticsInfo() {
3435
} // JAXB needs this
3536

37+
public ApplicationStatisticsInfo(Collection<StatisticsItemInfo> items) {
38+
statItem.addAll(items);
39+
}
40+
3641
public void add(StatisticsItemInfo statItem) {
3742
this.statItem.add(statItem);
3843
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public StatisticsItemInfo(
4141
this.count = count;
4242
}
4343

44+
public StatisticsItemInfo(StatisticsItemInfo info) {
45+
this.state = info.state;
46+
this.type = info.type;
47+
this.count = info.count;
48+
}
49+
4450
public YarnApplicationState getState() {
4551
return state;
4652
}
@@ -53,4 +59,7 @@ public long getCount() {
5359
return count;
5460
}
5561

62+
public void setCount(long count) {
63+
this.count = count;
64+
}
5665
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,13 +1129,50 @@ public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
11291129
String appId, String time, Set<String> requestPriorities,
11301130
Set<String> allocationRequestIds, String groupBy, String limit,
11311131
Set<String> actions, boolean summarize) {
1132-
throw new NotImplementedException("Code is not implemented");
1132+
1133+
// Only verify the app_id,
1134+
// because the specific subCluster needs to be found according to the app_id,
1135+
// and other verifications are directly handed over to the corresponding subCluster RM
1136+
if (appId == null || appId.isEmpty()) {
1137+
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
1138+
}
1139+
1140+
try {
1141+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
1142+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1143+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1144+
1145+
final HttpServletRequest hsrCopy = clone(hsr);
1146+
return interceptor.getAppActivities(hsrCopy, appId, time, requestPriorities,
1147+
allocationRequestIds, groupBy, limit, actions, summarize);
1148+
} catch (IllegalArgumentException e) {
1149+
RouterServerUtil.logAndThrowRunTimeException(e, "Unable to get subCluster by appId: %s.",
1150+
appId);
1151+
} catch (YarnException e) {
1152+
RouterServerUtil.logAndThrowRunTimeException("getAppActivities Failed.", e);
1153+
}
1154+
1155+
return null;
11331156
}
11341157

11351158
@Override
11361159
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
11371160
Set<String> stateQueries, Set<String> typeQueries) {
1138-
throw new NotImplementedException("Code is not implemented");
1161+
try {
1162+
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
1163+
final HttpServletRequest hsrCopy = clone(hsr);
1164+
Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class};
1165+
Object[] args = new Object[]{hsrCopy, stateQueries, typeQueries};
1166+
ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args);
1167+
Map<SubClusterInfo, ApplicationStatisticsInfo> appStatisticsMap = invokeConcurrent(
1168+
subClustersActive.values(), remoteMethod, ApplicationStatisticsInfo.class);
1169+
return RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values());
1170+
} catch (IOException e) {
1171+
RouterServerUtil.logAndThrowRunTimeException(e, "Get all active sub cluster(s) error.");
1172+
} catch (YarnException e) {
1173+
RouterServerUtil.logAndThrowRunTimeException(e, "getAppStatistics error.");
1174+
}
1175+
return null;
11391176
}
11401177

11411178
@Override

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
5858
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
5959
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
60+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
61+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
6062
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
6163
import org.apache.hadoop.yarn.webapp.BadRequestException;
6264
import org.apache.hadoop.yarn.webapp.ForbiddenException;
@@ -540,4 +542,38 @@ public static NodeToLabelsInfo mergeNodeToLabels(
540542

541543
return new NodeToLabelsInfo(nodeToLabels);
542544
}
545+
546+
public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo(
547+
Collection<ApplicationStatisticsInfo> appStatistics) {
548+
ApplicationStatisticsInfo result = new ApplicationStatisticsInfo();
549+
Map<String, StatisticsItemInfo> statisticsItemMap = new HashMap<>();
550+
551+
appStatistics.stream().forEach(appStatistic -> {
552+
List<StatisticsItemInfo> statisticsItemInfos = appStatistic.getStatItems();
553+
for (StatisticsItemInfo statisticsItemInfo : statisticsItemInfos) {
554+
555+
String statisticsItemKey =
556+
statisticsItemInfo.getType() + "_" + statisticsItemInfo.getState().toString();
557+
558+
StatisticsItemInfo statisticsItemValue;
559+
if (statisticsItemMap.containsKey(statisticsItemKey)) {
560+
statisticsItemValue = statisticsItemMap.get(statisticsItemKey);
561+
long statisticsItemValueCount = statisticsItemValue.getCount();
562+
long statisticsItemInfoCount = statisticsItemInfo.getCount();
563+
long newCount = statisticsItemValueCount + statisticsItemInfoCount;
564+
statisticsItemValue.setCount(newCount);
565+
} else {
566+
statisticsItemValue = new StatisticsItemInfo(statisticsItemInfo);
567+
}
568+
569+
statisticsItemMap.put(statisticsItemKey, statisticsItemValue);
570+
}
571+
});
572+
573+
if (!statisticsItemMap.isEmpty()) {
574+
result.getStatItems().addAll(statisticsItemMap.values());
575+
}
576+
577+
return result;
578+
}
543579
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.Collections;
2828
import java.util.Arrays;
2929
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.stream.Collectors;
3032

3133
import javax.servlet.http.HttpServletRequest;
3234
import javax.servlet.http.HttpServletResponse;
@@ -58,6 +60,20 @@
5860
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
5961
import org.apache.hadoop.yarn.exceptions.YarnException;
6062
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
63+
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
64+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
65+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
66+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
67+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
68+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
69+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
70+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
71+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
72+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
73+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
74+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
75+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
76+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
6177
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
6278
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
6379
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
@@ -78,13 +94,22 @@
7894
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
7995
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
8096
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
97+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
98+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
99+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
100+
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
81101
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
82102
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
83103
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
104+
import org.apache.hadoop.yarn.util.SystemClock;
105+
import org.apache.hadoop.yarn.util.resource.Resources;
84106
import org.apache.hadoop.yarn.webapp.NotFoundException;
107+
import org.mockito.Mockito;
85108
import org.slf4j.Logger;
86109
import org.slf4j.LoggerFactory;
87110

111+
import static org.mockito.Mockito.mock;
112+
88113
/**
89114
* This class mocks the RESTRequestInterceptor.
90115
*/
@@ -132,8 +157,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
132157
// Initialize appReport
133158
ApplicationReport appReport = ApplicationReport.newInstance(
134159
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0,
135-
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null,
136-
false, Priority.newInstance(newApp.getPriority()), null, null);
160+
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0,
161+
newApp.getApplicationType(), null, null, false, Priority.newInstance(newApp.getPriority()),
162+
null, null);
137163

138164
// Initialize appTimeoutsMap
139165
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>();
@@ -661,4 +687,105 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, Str
661687
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
662688
return Response.status(Status.OK).entity(targetAppQueue).build();
663689
}
690+
691+
public void updateApplicationState(YarnApplicationState appState, String appId)
692+
throws AuthorizationException, YarnException, InterruptedException, IOException {
693+
validateRunning();
694+
ApplicationId applicationId = ApplicationId.fromString(appId);
695+
if (!applicationMap.containsKey(applicationId)) {
696+
throw new NotFoundException("app with id: " + appId + " not found");
697+
}
698+
ApplicationReport appReport = applicationMap.get(applicationId);
699+
appReport.setYarnApplicationState(appState);
700+
}
701+
702+
@Override
703+
public ApplicationStatisticsInfo getAppStatistics(
704+
HttpServletRequest hsr, Set<String> stateQueries, Set<String> typeQueries) {
705+
if (!isRunning) {
706+
throw new RuntimeException("RM is stopped");
707+
}
708+
709+
Map<String, StatisticsItemInfo> itemInfoMap = new HashMap<>();
710+
711+
for (ApplicationReport appReport : applicationMap.values()) {
712+
713+
YarnApplicationState appState = appReport.getYarnApplicationState();
714+
String appType = appReport.getApplicationType();
715+
716+
if (stateQueries.contains(appState.name()) && typeQueries.contains(appType)) {
717+
String itemInfoMapKey = appState.toString() + "_" + appType;
718+
StatisticsItemInfo itemInfo = itemInfoMap.getOrDefault(itemInfoMapKey, null);
719+
if (itemInfo == null) {
720+
itemInfo = new StatisticsItemInfo(appState, appType, 1);
721+
} else {
722+
long newCount = itemInfo.getCount() + 1;
723+
itemInfo.setCount(newCount);
724+
}
725+
itemInfoMap.put(itemInfoMapKey, itemInfo);
726+
}
727+
}
728+
729+
return new ApplicationStatisticsInfo(itemInfoMap.values());
730+
}
731+
732+
@Override
733+
public AppActivitiesInfo getAppActivities(
734+
HttpServletRequest hsr, String appId, String time, Set<String> requestPriorities,
735+
Set<String> allocationRequestIds, String groupBy, String limit, Set<String> actions,
736+
boolean summarize) {
737+
if (!isRunning) {
738+
throw new RuntimeException("RM is stopped");
739+
}
740+
741+
ApplicationId applicationId = ApplicationId.fromString(appId);
742+
if (!applicationMap.containsKey(applicationId)) {
743+
throw new NotFoundException("app with id: " + appId + " not found");
744+
}
745+
746+
SchedulerNode schedulerNode = TestUtils.getMockNode("host0", "rack", 1, 10240);
747+
748+
RMContext rmContext = Mockito.mock(RMContext.class);
749+
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(this.getConf());
750+
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
751+
Mockito.when(scheduler.getMinimumResourceCapability()).thenReturn(Resources.none());
752+
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
753+
LeafQueue mockQueue = Mockito.mock(LeafQueue.class);
754+
Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
755+
Mockito.doReturn(rmApps).when(rmContext).getRMApps();
756+
757+
FiCaSchedulerNode node = (FiCaSchedulerNode) schedulerNode;
758+
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 0);
759+
RMApp mockApp = Mockito.mock(RMApp.class);
760+
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp).getApplicationId();
761+
Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp).getFinalApplicationStatus();
762+
rmApps.put(appAttemptId.getApplicationId(), mockApp);
763+
FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
764+
mock(ActiveUsersManager.class), rmContext);
765+
766+
ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext);
767+
newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3);
768+
769+
int numActivities = 10;
770+
for (int i = 0; i < numActivities; i++) {
771+
ActivitiesLogger.APP.startAppAllocationRecording(newActivitiesManager, node,
772+
SystemClock.getInstance().getTime(), app);
773+
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(newActivitiesManager, node, app,
774+
new SchedulerRequestKey(Priority.newInstance(0), 0, null),
775+
ActivityDiagnosticConstant.NODE_IS_BLACKLISTED, ActivityState.REJECTED,
776+
ActivityLevel.NODE);
777+
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(newActivitiesManager,
778+
app.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
779+
}
780+
781+
Set<Integer> prioritiesInt =
782+
requestPriorities.stream().map(pri -> Integer.parseInt(pri)).collect(Collectors.toSet());
783+
Set<Long> allocationReqIds =
784+
allocationRequestIds.stream().map(id -> Long.parseLong(id)).collect(Collectors.toSet());
785+
AppActivitiesInfo appActivitiesInfo = newActivitiesManager.
786+
getAppActivitiesInfo(app.getApplicationId(), prioritiesInt, allocationReqIds, null,
787+
Integer.parseInt(limit), summarize, 3);
788+
789+
return appActivitiesInfo;
790+
}
664791
}

0 commit comments

Comments
 (0)