Skip to content

Commit

Permalink
YARN-11228. [Federation] Add getAppAttempts, getAppAttempt REST APIs …
Browse files Browse the repository at this point in the history
…for Router.
  • Loading branch information
slfan1989 committed Aug 4, 2022
1 parent 66dec9d commit 01c679e
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,30 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,

@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
throw new NotImplementedException("Code is not implemented");
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = "
+ applicationId, null);
}

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempts(hsr, appId);
} catch (IllegalArgumentException e) {
String msg = String.format("Unable to get the AppAttempt appId: %s.", appId);
RouterServerUtil.logAndThrowRunTimeException(msg, e);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}

return null;
}

@Override
Expand All @@ -1377,7 +1400,35 @@ public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}

try {
ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = "
+ applicationId, null);
}

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempt(req, res, appId, appAttemptId);
} catch (IllegalArgumentException e) {
String msg = String.format("Unable to get the AppAttempt appId: %s, appAttemptId: %s.",
appId, appAttemptId);
RouterServerUtil.logAndThrowRunTimeException(msg, e);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}

return null;
}

@Override
Expand Down Expand Up @@ -1432,8 +1483,8 @@ public ContainerInfo getContainer(HttpServletRequest req,
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = "
+ applicationId, null);
}

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
Expand Down Expand Up @@ -1491,8 +1542,8 @@ public Response signalToContainer(String containerId, String command,
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = "
+ applicationId, null);
}

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
Expand Down Expand Up @@ -1565,7 +1616,8 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
* @return HomeSubCluster
* @throws YarnException on failure
*/
private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException {
private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId)
throws YarnException {
SubClusterInfo subClusterInfo = null;
SubClusterId subClusterId = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
Expand All @@ -67,6 +72,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
Expand Down Expand Up @@ -412,4 +419,50 @@ public Response signalToContainer(String containerId, String command,

return Response.status(Status.OK).build();
}

@Override
public org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)), "user",
"queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);

ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
"host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId(
newApplicationReport.getCurrentApplicationAttemptId(), 1));

return new AppAttemptInfo(attempt);
}

@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

AppAttemptsInfo infos = new AppAttemptsInfo();
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo
appAttemptInfo1 = new org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo();
infos.add(appAttemptInfo1);

return infos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -717,4 +718,37 @@ public void testGetContainer()
appId.toString(), appAttemptId.toString(), "0");
Assert.assertNotNull(containerInfo);
}

@Test
public void testGetAppAttempts()
throws IOException, InterruptedException, YarnException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());

Assert.assertNotNull(interceptor.submitApplication(context, null));

AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(null, appId.toString());
Assert.assertNotNull(appAttemptsInfo);
Assert.assertTrue(!appAttemptsInfo.getAttempts().isEmpty());
}

@Test
public void testGetAppAttempt()
throws IOException, InterruptedException, YarnException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Assert.assertNotNull(interceptor.submitApplication(context, null));
ApplicationAttemptId appAttemptIdExcept = ApplicationAttemptId.newInstance(appId, 1);

org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo
appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(),
appAttemptIdExcept.toString());
Assert.assertNotNull(appAttemptInfo);

Assert.assertEquals(appAttemptIdExcept, appAttemptInfo.getAppAttemptId());
}
}

0 comments on commit 01c679e

Please sign in to comment.