Skip to content

Commit 2a5a313

Browse files
committed
MAPREDUCE-6697. Concurrent task limits should only be applied when necessary. Contributed by Nathan Roberts.
(cherry picked from commit a5c0476)
1 parent ce57458 commit 2a5a313

File tree

2 files changed

+73
-6
lines changed

2 files changed

+73
-6
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,8 @@ void processFinishedContainer(ContainerStatus container) {
889889

890890
private void applyConcurrentTaskLimits() {
891891
int numScheduledMaps = scheduledRequests.maps.size();
892-
if (maxRunningMaps > 0 && numScheduledMaps > 0) {
892+
if (maxRunningMaps > 0 && numScheduledMaps > 0 &&
893+
getJob().getTotalMaps() > maxRunningMaps) {
893894
int maxRequestedMaps = Math.max(0,
894895
maxRunningMaps - assignedRequests.maps.size());
895896
int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
@@ -906,7 +907,8 @@ private void applyConcurrentTaskLimits() {
906907
}
907908

908909
int numScheduledReduces = scheduledRequests.reduces.size();
909-
if (maxRunningReduces > 0 && numScheduledReduces > 0) {
910+
if (maxRunningReduces > 0 && numScheduledReduces > 0 &&
911+
getJob().getTotalReduces() > maxRunningReduces) {
910912
int maxRequestedReduces = Math.max(0,
911913
maxRunningReduces - assignedRequests.reduces.size());
912914
int reduceRequestLimit = Math.min(maxRequestedReduces,

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2775,15 +2775,78 @@ public Token<AMRMTokenIdentifier> run() throws Exception {
27752775
new Text(rmAddr), ugiToken.getService());
27762776
}
27772777

2778+
@Test
2779+
public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception {
2780+
final int MAP_COUNT = 1;
2781+
final int REDUCE_COUNT = 1;
2782+
final int MAP_LIMIT = 1;
2783+
final int REDUCE_LIMIT = 1;
2784+
Configuration conf = new Configuration();
2785+
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
2786+
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
2787+
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
2788+
ApplicationId appId = ApplicationId.newInstance(1, 1);
2789+
ApplicationAttemptId appAttemptId =
2790+
ApplicationAttemptId.newInstance(appId, 1);
2791+
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
2792+
Job mockJob = mock(Job.class);
2793+
when(mockJob.getReport()).thenReturn(
2794+
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
2795+
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
2796+
when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
2797+
when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
2798+
2799+
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
2800+
MyContainerAllocator allocator =
2801+
new MyContainerAllocator(null, conf, appAttemptId, mockJob,
2802+
SystemClock.getInstance()) {
2803+
@Override
2804+
protected void register() {
2805+
}
2806+
2807+
@Override
2808+
protected ApplicationMasterProtocol createSchedulerProxy() {
2809+
return mockScheduler;
2810+
}
2811+
2812+
@Override
2813+
protected void setRequestLimit(Priority priority,
2814+
Resource capability, int limit) {
2815+
Assert.fail("setRequestLimit() should not be invoked");
2816+
}
2817+
};
2818+
2819+
// create some map requests
2820+
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
2821+
for (int i = 0; i < reqMapEvents.length; ++i) {
2822+
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
2823+
}
2824+
allocator.sendRequests(Arrays.asList(reqMapEvents));
2825+
// create some reduce requests
2826+
ContainerRequestEvent[] reqReduceEvents =
2827+
new ContainerRequestEvent[REDUCE_COUNT];
2828+
for (int i = 0; i < reqReduceEvents.length; ++i) {
2829+
reqReduceEvents[i] =
2830+
createReq(jobId, i, 1024, new String[] {}, false, true);
2831+
}
2832+
allocator.sendRequests(Arrays.asList(reqReduceEvents));
2833+
allocator.schedule();
2834+
allocator.schedule();
2835+
allocator.schedule();
2836+
allocator.close();
2837+
}
2838+
27782839
@Test
27792840
public void testConcurrentTaskLimits() throws Exception {
2841+
final int MAP_COUNT = 5;
2842+
final int REDUCE_COUNT = 2;
27802843
final int MAP_LIMIT = 3;
27812844
final int REDUCE_LIMIT = 1;
27822845
LOG.info("Running testConcurrentTaskLimits");
27832846
Configuration conf = new Configuration();
27842847
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
27852848
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
2786-
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
2849+
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
27872850
ApplicationId appId = ApplicationId.newInstance(1, 1);
27882851
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
27892852
appId, 1);
@@ -2792,6 +2855,9 @@ public void testConcurrentTaskLimits() throws Exception {
27922855
when(mockJob.getReport()).thenReturn(
27932856
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
27942857
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
2858+
when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
2859+
when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
2860+
27952861
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
27962862
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
27972863
appAttemptId, mockJob, SystemClock.getInstance()) {
@@ -2806,14 +2872,13 @@ protected ApplicationMasterProtocol createSchedulerProxy() {
28062872
};
28072873

28082874
// create some map requests
2809-
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
2875+
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
28102876
for (int i = 0; i < reqMapEvents.length; ++i) {
28112877
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
28122878
}
28132879
allocator.sendRequests(Arrays.asList(reqMapEvents));
2814-
28152880
// create some reduce requests
2816-
ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
2881+
ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT];
28172882
for (int i = 0; i < reqReduceEvents.length; ++i) {
28182883
reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
28192884
false, true);

0 commit comments

Comments
 (0)