Skip to content

Commit 36c4be8

Browse files
hotcodemachaAshutosh Gupta
andauthored
MAPREDUCE-7369. Fixed MapReduce tasks timing out when spends more time on MultipleOutputs#close (#4247)
Contributed by Ravuri Sushma sree. Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
1 parent 10fc865 commit 36c4be8

File tree

4 files changed

+67
-19
lines changed

4 files changed

+67
-19
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
import java.util.concurrent.ConcurrentMap;
2929
import java.util.concurrent.atomic.AtomicReference;
3030

31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import org.apache.hadoop.classification.VisibleForTesting;
3135
import org.apache.hadoop.conf.Configuration;
3236
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
3337
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -50,8 +54,8 @@
5054
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
5155
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
5256
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
53-
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
5457
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
58+
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
5559
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
5660
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
5761
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@@ -61,10 +65,6 @@
6165
import org.apache.hadoop.util.StringInterner;
6266
import org.apache.hadoop.util.Time;
6367
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
64-
import org.slf4j.Logger;
65-
import org.slf4j.LoggerFactory;
66-
67-
import org.apache.hadoop.classification.VisibleForTesting;
6868

6969
/**
7070
* This class is responsible for talking to the task umblical.
@@ -409,6 +409,11 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
409409
if (LOG.isDebugEnabled()) {
410410
LOG.debug("Ping from " + taskAttemptID.toString());
411411
}
412+
// Consider ping from the tasks for liveliness check
413+
if (getConfig().getBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK,
414+
MRJobConfig.DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK)) {
415+
taskHeartbeatHandler.progressing(yarnAttemptID);
416+
}
412417
return feedback;
413418
}
414419

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,30 @@
1717
*/
1818
package org.apache.hadoop.mapred;
1919

20-
import java.util.function.Supplier;
21-
import org.apache.hadoop.mapred.Counters.Counter;
22-
import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
23-
2420
import java.io.IOException;
2521
import java.util.ArrayList;
2622
import java.util.Arrays;
2723
import java.util.List;
2824
import java.util.concurrent.ConcurrentMap;
2925
import java.util.concurrent.atomic.AtomicReference;
26+
import java.util.function.Supplier;
27+
28+
import org.junit.After;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.mockito.ArgumentCaptor;
32+
import org.mockito.Captor;
33+
import org.mockito.Mock;
34+
import org.mockito.junit.MockitoJUnitRunner;
3035

3136
import org.apache.hadoop.conf.Configuration;
3237
import org.apache.hadoop.fs.Path;
38+
import org.apache.hadoop.mapred.Counters.Counter;
3339
import org.apache.hadoop.mapreduce.MRJobConfig;
3440
import org.apache.hadoop.mapreduce.TaskType;
3541
import org.apache.hadoop.mapreduce.TypeConverter;
3642
import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
43+
import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
3744
import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
3845
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
3946
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -48,9 +55,9 @@
4855
import org.apache.hadoop.mapreduce.v2.app.job.Job;
4956
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
5057
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
58+
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
5159
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
5260
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
53-
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
5461
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
5562
import org.apache.hadoop.test.GenericTestUtils;
5663
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -60,17 +67,22 @@
6067
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
6168
import org.apache.hadoop.yarn.util.ControlledClock;
6269
import org.apache.hadoop.yarn.util.SystemClock;
63-
import org.junit.After;
64-
import org.junit.Test;
65-
import org.junit.runner.RunWith;
66-
import org.mockito.ArgumentCaptor;
67-
import org.mockito.Captor;
68-
import org.mockito.Mock;
69-
import org.mockito.junit.MockitoJUnitRunner;
7070

7171
import static org.assertj.core.api.Assertions.assertThat;
72-
import static org.junit.Assert.*;
73-
import static org.mockito.Mockito.*;
72+
import static org.junit.Assert.assertEquals;
73+
import static org.junit.Assert.assertFalse;
74+
import static org.junit.Assert.assertNotNull;
75+
import static org.junit.Assert.assertNull;
76+
import static org.junit.Assert.assertTrue;
77+
import static org.junit.Assert.fail;
78+
import static org.mockito.Mockito.any;
79+
import static org.mockito.Mockito.doReturn;
80+
import static org.mockito.Mockito.eq;
81+
import static org.mockito.Mockito.mock;
82+
import static org.mockito.Mockito.never;
83+
import static org.mockito.Mockito.times;
84+
import static org.mockito.Mockito.verify;
85+
import static org.mockito.Mockito.when;
7486

7587
/**
7688
* Tests the behavior of TaskAttemptListenerImpl.
@@ -417,6 +429,23 @@ public void testStatusUpdateProgress()
417429
verify(hbHandler).progressing(eq(attemptId));
418430
}
419431

432+
@Test
433+
public void testPingUpdateProgress() throws IOException, InterruptedException {
434+
configureMocks();
435+
Configuration conf = new Configuration();
436+
conf.setBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK, true);
437+
listener.init(conf);
438+
listener.start();
439+
listener.registerPendingTask(task, wid);
440+
listener.registerLaunchedTask(attemptId, wid);
441+
verify(hbHandler).register(attemptId);
442+
443+
// make sure a ping does report progress
444+
AMFeedback feedback = listener.statusUpdate(attemptID, null);
445+
assertTrue(feedback.getTaskFound());
446+
verify(hbHandler, times(1)).progressing(eq(attemptId));
447+
}
448+
420449
@Test
421450
public void testSingleStatusUpdate()
422451
throws IOException, InterruptedException {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,13 @@ public interface MRJobConfig {
919919
MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
920920
public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
921921

922+
/** Whether to consider ping from tasks in liveliness check. */
923+
String MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK =
924+
"mapreduce.task.ping-for-liveliness-check.enabled";
925+
boolean DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK
926+
= false;
927+
928+
922929
/**
923930
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
924931
* milliseconds before aborting. During this interval, AM will still try

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,13 @@
286286
</description>
287287
</property>
288288

289+
<property>
290+
<name>mapreduce.task.ping-for-liveliness-check.enabled</name>
291+
<value>false</value>
292+
<description>Whether to consider ping from tasks in liveliness check.
293+
</description>
294+
</property>
295+
289296
<property>
290297
<name>mapreduce.map.memory.mb</name>
291298
<value>-1</value>

0 commit comments

Comments
 (0)