Skip to content

Commit d9a6792

Browse files
authored
YARN-11011. Make YARN Router throw Exception to client clearly. (#6211) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
1 parent 72d7b43 commit d9a6792

File tree

4 files changed

+102
-14
lines changed

4 files changed

+102
-14
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,9 @@ public static void logAndThrowException(Throwable t, String errMsgFormat, Object
125125
public static void logAndThrowException(String errMsg, Throwable t)
126126
throws YarnException {
127127
if (t != null) {
128-
LOG.error(errMsg, t);
129-
throw new YarnException(errMsg, t);
128+
String newErrMsg = getErrorMsg(errMsg, t);
129+
LOG.error(newErrMsg, t);
130+
throw new YarnException(newErrMsg, t);
130131
} else {
131132
LOG.error(errMsg);
132133
throw new YarnException(errMsg);
@@ -146,6 +147,13 @@ public static void logAndThrowException(String errMsg) throws YarnException {
146147
throw new YarnException(errMsg);
147148
}
148149

150+
private static String getErrorMsg(String errMsg, Throwable t) {
151+
if (t.getMessage() != null) {
152+
return errMsg + "" + t.getMessage();
153+
}
154+
return errMsg;
155+
}
156+
149157
public static <R> R createRequestInterceptorChain(Configuration conf, String pipeLineClassName,
150158
String interceptorClassName, Class<R> clazz) {
151159

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
2525
import java.io.IOException;
2626
import java.lang.reflect.Method;
27+
import java.lang.reflect.InvocationTargetException;
2728
import java.util.ArrayList;
2829
import java.util.Collection;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Random;
3233
import java.util.TreeMap;
33-
import java.util.Set;
3434
import java.util.concurrent.BlockingQueue;
3535
import java.util.concurrent.Callable;
3636
import java.util.concurrent.ConcurrentHashMap;
@@ -842,13 +842,27 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
842842
// Generate parallel Callable tasks
843843
for (SubClusterId subClusterId : subClusterIds) {
844844
callables.add(() -> {
845-
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
846-
String methodName = request.getMethodName();
847-
Class<?>[] types = request.getTypes();
848-
Object[] params = request.getParams();
849-
Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
850-
Object result = method.invoke(protocol, params);
851-
return Pair.of(subClusterId, result);
845+
try {
846+
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
847+
String methodName = request.getMethodName();
848+
Class<?>[] types = request.getTypes();
849+
Object[] params = request.getParams();
850+
Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
851+
Object result = method.invoke(protocol, params);
852+
return Pair.of(subClusterId, result);
853+
} catch (Exception e) {
854+
Throwable cause = e.getCause();
855+
// We use Callable. If the exception thrown here is InvocationTargetException,
856+
// it is a wrapped exception. We need to get the real cause of the error.
857+
if (cause != null && cause instanceof InvocationTargetException) {
858+
cause = cause.getCause();
859+
}
860+
String errMsg = (cause.getMessage() != null) ? cause.getMessage() : "UNKNOWN";
861+
YarnException yarnException =
862+
new YarnException(String.format("subClusterId %s exec %s error %s.",
863+
subClusterId, request.getMethodName(), errMsg), e);
864+
return Pair.of(subClusterId, yarnException);
865+
}
852866
});
853867
}
854868

@@ -862,8 +876,11 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
862876
Pair<SubClusterId, Object> pair = future.get();
863877
subClusterId = pair.getKey();
864878
Object result = pair.getValue();
879+
if (result instanceof YarnException) {
880+
throw YarnException.class.cast(result);
881+
}
865882
results.put(subClusterId, clazz.cast(result));
866-
} catch (InterruptedException | ExecutionException e) {
883+
} catch (InterruptedException | ExecutionException | YarnException e) {
867884
Throwable cause = e.getCause();
868885
LOG.error("Cannot execute {} on {} : {}", request.getMethodName(),
869886
subClusterId.getId(), cause.getMessage());
@@ -877,9 +894,8 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
877894
// All sub-clusters return results to be considered successful,
878895
// otherwise an exception will be thrown.
879896
if (exceptions != null && !exceptions.isEmpty()) {
880-
Set<SubClusterId> subClusterIdSets = exceptions.keySet();
881-
throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
882-
StringUtils.join(subClusterIdSets, ","));
897+
throw new YarnException("invokeConcurrent Failed = " +
898+
StringUtils.join(exceptions.values(), ","));
883899
}
884900

885901
// return result

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
3535
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
3636
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
37+
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
3738
import org.apache.hadoop.yarn.api.records.ApplicationId;
3839
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
3940
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -353,4 +354,60 @@ private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCl
353354
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
354355
Assert.assertEquals(expectSubCluster, respSubClusterId);
355356
}
357+
358+
@Test
359+
public void testSubmitApplicationTwoBadNodeWithRealError() throws Exception {
360+
LOG.info("Test submitApplication with two bad SubClusters.");
361+
setupCluster(Arrays.asList(bad1, bad2));
362+
interceptor.setNumSubmitRetries(1);
363+
364+
final ApplicationId appId =
365+
ApplicationId.newInstance(System.currentTimeMillis(), 5);
366+
367+
final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
368+
369+
LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
370+
() -> interceptor.submitApplication(request));
371+
}
372+
373+
@Test
374+
public void testSubmitApplicationOneBadNodeWithRealError() throws Exception {
375+
LOG.info("Test submitApplication with one bad SubClusters.");
376+
setupCluster(Arrays.asList(bad1));
377+
interceptor.setNumSubmitRetries(0);
378+
379+
final ApplicationId appId =
380+
ApplicationId.newInstance(System.currentTimeMillis(), 6);
381+
382+
final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
383+
384+
LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
385+
() -> interceptor.submitApplication(request));
386+
}
387+
388+
@Test
389+
public void testGetClusterMetricsTwoBadNodeWithRealError() throws Exception {
390+
LOG.info("Test getClusterMetrics with two bad SubClusters.");
391+
setupCluster(Arrays.asList(bad1, bad2));
392+
GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance();
393+
394+
LambdaTestUtils.intercept(YarnException.class,
395+
"subClusterId 1 exec getClusterMetrics error RM is stopped.",
396+
() -> interceptor.getClusterMetrics(request));
397+
398+
LambdaTestUtils.intercept(YarnException.class,
399+
"subClusterId 2 exec getClusterMetrics error RM is stopped.",
400+
() -> interceptor.getClusterMetrics(request));
401+
}
402+
403+
@Test
404+
public void testGetClusterMetricsOneBadNodeWithRealError() throws Exception {
405+
LOG.info("Test getClusterMetrics with one bad SubClusters.");
406+
setupCluster(Arrays.asList(bad1));
407+
GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance();
408+
409+
LambdaTestUtils.intercept(YarnException.class,
410+
"subClusterId 1 exec getClusterMetrics error RM is stopped.",
411+
() -> interceptor.getClusterMetrics(request));
412+
}
356413
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
3838
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
3939
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
40+
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
41+
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
4042
import org.apache.hadoop.yarn.api.records.NodeAttribute;
4143
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
4244
import org.apache.hadoop.yarn.api.records.Resource;
@@ -126,6 +128,11 @@ public SubmitApplicationResponse submitApplication(
126128
throw new ConnectException("RM is stopped");
127129
}
128130

131+
@Override
132+
public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request)
133+
throws YarnException {
134+
throw new YarnException("RM is stopped");
135+
}
129136
}
130137

131138
/**

0 commit comments

Comments
 (0)