Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtocolSignature;
Expand All @@ -50,8 +54,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
Expand All @@ -61,10 +65,6 @@
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
* This class is responsible for talking to the task umblical.
Expand Down Expand Up @@ -409,6 +409,11 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
if (LOG.isDebugEnabled()) {
LOG.debug("Ping from " + taskAttemptID.toString());
}
// Consider ping from the tasks for liveliness check
if (getConfig().getBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK,
MRJobConfig.DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK)) {
taskHeartbeatHandler.progressing(yarnAttemptID);
}
return feedback;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,30 @@
*/
package org.apache.hadoop.mapred;

import java.util.function.Supplier;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
Expand All @@ -48,9 +55,9 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.Dispatcher;
Expand All @@ -60,17 +67,22 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests the behavior of TaskAttemptListenerImpl.
Expand Down Expand Up @@ -417,6 +429,23 @@ public void testStatusUpdateProgress()
verify(hbHandler).progressing(eq(attemptId));
}

@Test
public void testPingUpdateProgress() throws IOException, InterruptedException {
configureMocks();
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK, true);
listener.init(conf);
listener.start();
listener.registerPendingTask(task, wid);
listener.registerLaunchedTask(attemptId, wid);
verify(hbHandler).register(attemptId);

// make sure a ping does report progress
AMFeedback feedback = listener.statusUpdate(attemptID, null);
assertTrue(feedback.getTaskFound());
verify(hbHandler, times(1)).progressing(eq(attemptId));
}

@Test
public void testSingleStatusUpdate()
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,13 @@ public interface MRJobConfig {
MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;

/** Whether to consider ping from tasks in liveliness check. */
String MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK =
"mapreduce.task.ping-for-liveliness-check.enabled";
boolean DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK
= false;


/**
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
* milliseconds before aborting. During this interval, AM will still try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@
</description>
</property>

<property>
<name>mapreduce.task.ping-for-liveliness-check.enabled</name>
<value>false</value>
<description>Whether to consider ping from tasks in liveliness check.
</description>
</property>

<property>
<name>mapreduce.map.memory.mb</name>
<value>-1</value>
Expand Down