Skip to content

Commit 7ffb994

Browse files
committed
YARN-4355. NPE while processing localizer heartbeat. Contributed by Varun Saxena & Jonathan Hung.
1 parent 43aef30 commit 7ffb994

File tree

2 files changed

+132
-20
lines changed

2 files changed

+132
-20
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,6 @@ LocalizerHeartbeatResponse processHeartbeat(
10361036
List<LocalResourceStatus> remoteResourceStatuses) {
10371037
LocalizerHeartbeatResponse response =
10381038
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
1039-
10401039
String user = context.getUser();
10411040
ApplicationId applicationId =
10421041
context.getContainerId().getApplicationAttemptId().getApplicationId();
@@ -1059,14 +1058,19 @@ LocalizerHeartbeatResponse processHeartbeat(
10591058
LOG.error("Unknown resource reported: " + req);
10601059
continue;
10611060
}
1061+
LocalResourcesTracker tracker =
1062+
getLocalResourcesTracker(req.getVisibility(), user, applicationId);
1063+
if (tracker == null) {
1064+
// This is likely due to a race between heartbeat and
1065+
// app cleaning up.
1066+
continue;
1067+
}
10621068
switch (stat.getStatus()) {
10631069
case FETCH_SUCCESS:
10641070
// notify resource
10651071
try {
1066-
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
1067-
.handle(
1068-
new ResourceLocalizedEvent(req, stat.getLocalPath().toPath(),
1069-
stat.getLocalSize()));
1072+
tracker.handle(new ResourceLocalizedEvent(req,
1073+
stat.getLocalPath().toPath(), stat.getLocalSize()));
10701074
} catch (URISyntaxException e) { }
10711075

10721076
// unlocking the resource and removing it from scheduled resource
@@ -1080,9 +1084,8 @@ LocalizerHeartbeatResponse processHeartbeat(
10801084
final String diagnostics = stat.getException().toString();
10811085
LOG.warn(req + " failed: " + diagnostics);
10821086
fetchFailed = true;
1083-
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
1084-
.handle(new ResourceFailedLocalizationEvent(
1085-
req, diagnostics));
1087+
tracker.handle(new ResourceFailedLocalizationEvent(req,
1088+
diagnostics));
10861089

10871090
// unlocking the resource and removing it from scheduled resource
10881091
// list
@@ -1092,9 +1095,8 @@ LocalizerHeartbeatResponse processHeartbeat(
10921095
default:
10931096
LOG.info("Unknown status: " + stat.getStatus());
10941097
fetchFailed = true;
1095-
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
1096-
.handle(new ResourceFailedLocalizationEvent(
1097-
req, stat.getException().getMessage()));
1098+
tracker.handle(new ResourceFailedLocalizationEvent(req,
1099+
stat.getException().getMessage()));
10981100
break;
10991101
}
11001102
}
@@ -1114,10 +1116,14 @@ LocalizerHeartbeatResponse processHeartbeat(
11141116
LocalResource next = findNextResource();
11151117
if (next != null) {
11161118
try {
1117-
ResourceLocalizationSpec resource =
1118-
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
1119-
getPathForLocalization(next));
1120-
rsrcs.add(resource);
1119+
LocalResourcesTracker tracker = getLocalResourcesTracker(
1120+
next.getVisibility(), user, applicationId);
1121+
if (tracker != null) {
1122+
ResourceLocalizationSpec resource =
1123+
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
1124+
getPathForLocalization(next, tracker));
1125+
rsrcs.add(resource);
1126+
}
11211127
} catch (IOException e) {
11221128
LOG.error("local path for PRIVATE localization could not be " +
11231129
"found. Disks might have failed.", e);
@@ -1136,14 +1142,12 @@ LocalizerHeartbeatResponse processHeartbeat(
11361142
return response;
11371143
}
11381144

1139-
private Path getPathForLocalization(LocalResource rsrc) throws IOException,
1140-
URISyntaxException {
1145+
private Path getPathForLocalization(LocalResource rsrc,
1146+
LocalResourcesTracker tracker) throws IOException, URISyntaxException {
11411147
String user = context.getUser();
11421148
ApplicationId appId =
11431149
context.getContainerId().getApplicationAttemptId().getApplicationId();
11441150
LocalResourceVisibility vis = rsrc.getVisibility();
1145-
LocalResourcesTracker tracker =
1146-
getLocalResourcesTracker(vis, user, appId);
11471151
String cacheDirectory = null;
11481152
if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
11491153
cacheDirectory = getUserFileCachePath(user);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertNotNull;
2323
import static org.junit.Assert.assertNull;
2424
import static org.junit.Assert.assertTrue;
25+
import static org.junit.Assert.fail;
2526
import static org.mockito.Matchers.anyBoolean;
2627
import static org.mockito.Matchers.anyInt;
2728
import static org.mockito.Matchers.anyLong;
@@ -147,7 +148,6 @@
147148
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
148149
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
149150
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
150-
import org.apache.hadoop.yarn.util.ConverterUtils;
151151
import org.junit.After;
152152
import org.junit.Before;
153153
import org.junit.BeforeClass;
@@ -1482,6 +1482,114 @@ public void testPublicResourceInitializesLocalDir() throws Exception {
14821482
}
14831483
}
14841484

1485+
@Test(timeout = 20000)
1486+
@SuppressWarnings("unchecked")
1487+
public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
1488+
conf.set(YarnConfiguration.NM_LOCAL_DIRS,
1489+
lfs.makeQualified(new Path(basedir, 0 + "")).toString());
1490+
// Start dispatcher.
1491+
DrainDispatcher dispatcher = new DrainDispatcher();
1492+
dispatcher.init(conf);
1493+
dispatcher.start();
1494+
dispatcher.register(ApplicationEventType.class, mock(EventHandler.class));
1495+
dispatcher.register(ContainerEventType.class, mock(EventHandler.class));
1496+
1497+
DummyExecutor exec = new DummyExecutor();
1498+
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
1499+
dirsHandler.init(conf);
1500+
// Start resource localization service.
1501+
ResourceLocalizationService rawService = new ResourceLocalizationService(
1502+
dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext);
1503+
ResourceLocalizationService spyService = spy(rawService);
1504+
doReturn(mockServer).when(spyService).createServer();
1505+
doReturn(lfs).when(spyService).
1506+
getLocalFileContext(isA(Configuration.class));
1507+
try {
1508+
spyService.init(conf);
1509+
spyService.start();
1510+
1511+
// Init application resources.
1512+
final Application app = mock(Application.class);
1513+
final ApplicationId appId = BuilderUtils.newApplicationId(1234567890L, 3);
1514+
when(app.getUser()).thenReturn("user0");
1515+
when(app.getAppId()).thenReturn(appId);
1516+
when(app.toString()).thenReturn(appId.toString());
1517+
spyService.handle(new ApplicationLocalizationEvent(
1518+
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
1519+
dispatcher.await();
1520+
1521+
// Initialize localizer.
1522+
Random r = new Random();
1523+
long seed = r.nextLong();
1524+
System.out.println("SEED: " + seed);
1525+
r.setSeed(seed);
1526+
final Container c = getMockContainer(appId, 46, "user0");
1527+
FSDataOutputStream out =
1528+
new FSDataOutputStream(new DataOutputBuffer(), null);
1529+
doReturn(out).when(spylfs).createInternal(isA(Path.class),
1530+
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
1531+
anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
1532+
anyBoolean());
1533+
final LocalResource resource1 = getAppMockedResource(r);
1534+
final LocalResource resource2 = getAppMockedResource(r);
1535+
1536+
// Send localization requests for container.
1537+
// 2 resources generated with APPLICATION visibility.
1538+
final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
1539+
final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
1540+
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
1541+
new HashMap<LocalResourceVisibility,
1542+
Collection<LocalResourceRequest>>();
1543+
List<LocalResourceRequest> appResourceList = Arrays.asList(req1, req2);
1544+
rsrcs.put(LocalResourceVisibility.APPLICATION, appResourceList);
1545+
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
1546+
dispatcher.await();
1547+
1548+
// Wait for localization to begin.
1549+
exec.waitForLocalizers(1);
1550+
final String containerIdStr = c.getContainerId().toString();
1551+
LocalizerRunner locRunnerForContainer =
1552+
spyService.getLocalizerRunner(containerIdStr);
1553+
// Heartbeats from container localizer
1554+
LocalResourceStatus rsrcSuccess = mock(LocalResourceStatus.class);
1555+
LocalizerStatus stat = mock(LocalizerStatus.class);
1556+
when(stat.getLocalizerId()).thenReturn(containerIdStr);
1557+
when(rsrcSuccess.getResource()).thenReturn(resource1);
1558+
when(rsrcSuccess.getLocalSize()).thenReturn(4344L);
1559+
when(rsrcSuccess.getLocalPath()).thenReturn(getPath("/some/path"));
1560+
when(rsrcSuccess.getStatus()).
1561+
thenReturn(ResourceStatusType.FETCH_SUCCESS);
1562+
when(stat.getResources()).
1563+
thenReturn(Collections.<LocalResourceStatus>emptyList());
1564+
1565+
// First heartbeat which schedules first resource.
1566+
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
1567+
assertEquals("NM should tell localizer to be LIVE in Heartbeat.",
1568+
LocalizerAction.LIVE, response.getLocalizerAction());
1569+
1570+
// Cleanup application.
1571+
spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs));
1572+
spyService.handle(new ApplicationLocalizationEvent(
1573+
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app));
1574+
dispatcher.await();
1575+
try {
1576+
// Directly send heartbeat to introduce race as app is being cleaned up.
1577+
locRunnerForContainer.processHeartbeat(
1578+
Collections.singletonList(rsrcSuccess));
1579+
} catch (Exception e) {
1580+
fail("Exception should not have been thrown on processing heartbeat");
1581+
}
1582+
// Send another heartbeat.
1583+
response = spyService.heartbeat(stat);
1584+
assertEquals("NM should tell localizer to DIE in Heartbeat.",
1585+
LocalizerAction.DIE, response.getLocalizerAction());
1586+
exec.setStopLocalization();
1587+
} finally {
1588+
spyService.stop();
1589+
dispatcher.stop();
1590+
}
1591+
}
1592+
14851593
@Test(timeout=20000)
14861594
@SuppressWarnings("unchecked") // mocked generics
14871595
public void testFailedPublicResource() throws Exception {

0 commit comments

Comments
 (0)