Skip to content

Commit d70f523

Browse files
YARN-9640. Slow event processing could cause too many attempt unregister events. Contributed by Bibin A Chundatt.
1 parent 07e3cf9 commit d70f523

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public class ApplicationMasterService extends AbstractService implements
9494
RecordFactoryProvider.getRecordFactory(null);
9595
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
9696
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
97+
private final ConcurrentHashMap<ApplicationAttemptId, Boolean>
98+
finishedAttemptCache = new ConcurrentHashMap<>();
9799
protected final RMContext rmContext;
98100
private final AMSProcessingChain amsProcessingChain;
99101
private boolean timelineServiceV2Enabled;
@@ -339,11 +341,14 @@ public FinishApplicationMasterResponse finishApplicationMaster(
339341
throw new ApplicationMasterNotRegisteredException(message);
340342
}
341343

342-
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
343344
FinishApplicationMasterResponse response =
344345
FinishApplicationMasterResponse.newInstance(false);
345-
this.amsProcessingChain.finishApplicationMaster(
346-
applicationAttemptId, request, response);
346+
if (finishedAttemptCache.putIfAbsent(applicationAttemptId, true)
347+
== null) {
348+
this.amsProcessingChain
349+
.finishApplicationMaster(applicationAttemptId, request, response);
350+
}
351+
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
347352
return response;
348353
}
349354
}
@@ -492,6 +497,7 @@ protected boolean setAttemptLastResponseId(ApplicationAttemptId attemptId,
492497
public void unregisterAttempt(ApplicationAttemptId attemptId) {
493498
LOG.info("Unregistering app attempt : " + attemptId);
494499
responseMap.remove(attemptId);
500+
finishedAttemptCache.remove(attemptId);
495501
rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
496502
}
497503

@@ -506,6 +512,8 @@ protected void serviceStop() throws Exception {
506512
if (this.server != null) {
507513
this.server.stop();
508514
}
515+
responseMap.clear();
516+
finishedAttemptCache.clear();
509517
super.serviceStop();
510518
}
511519

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java

+53
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
package org.apache.hadoop.yarn.server.resourcemanager;
1818

1919
import com.google.common.collect.ImmutableMap;
20+
import org.apache.hadoop.yarn.event.Dispatcher;
21+
import org.apache.hadoop.yarn.event.DrainDispatcher;
22+
import org.apache.hadoop.yarn.event.Event;
23+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
2024
import org.slf4j.Logger;
2125
import org.slf4j.LoggerFactory;
2226
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -354,6 +358,55 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception {
354358
}
355359
}
356360

361+
@Test(timeout = 1200000)
362+
public void testRepeatedFinishApplicationMaster() throws Exception {
363+
364+
CountingDispatcher dispatcher = new CountingDispatcher();
365+
MockRM rm = new MockRM(conf) {
366+
@Override
367+
protected Dispatcher createDispatcher() {
368+
return dispatcher;
369+
}
370+
};
371+
372+
try {
373+
rm.start();
374+
// Register node1
375+
MockNM nm1 = rm.registerNode(DEFAULT_HOST + ":" + DEFAULT_PORT, 6 * GB);
376+
// Submit an application
377+
RMApp app1 = rm.submitApp(2048);
378+
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
379+
am1.registerAppAttempt();
380+
FinishApplicationMasterRequest req = FinishApplicationMasterRequest
381+
.newInstance(FinalApplicationStatus.FAILED, "", "");
382+
for (int i = 0; i < 10; i++) {
383+
am1.unregisterAppAttempt(req, false);
384+
}
385+
Assert.assertEquals("Expecting only one event", 1,
386+
dispatcher.getEventCount());
387+
} finally {
388+
rm.stop();
389+
}
390+
}
391+
392+
static class CountingDispatcher extends DrainDispatcher {
393+
private int eventreceived = 0;
394+
395+
@SuppressWarnings("rawtypes")
396+
@Override
397+
protected void dispatch(Event event) {
398+
if (event.getType() == RMAppAttemptEventType.UNREGISTERED) {
399+
eventreceived++;
400+
} else {
401+
super.dispatch(event);
402+
}
403+
}
404+
405+
public int getEventCount() {
406+
return eventreceived;
407+
}
408+
}
409+
357410
@Test(timeout = 3000000)
358411
public void testResourceTypes() throws Exception {
359412
HashMap<YarnConfiguration,

0 commit comments

Comments
 (0)