Skip to content

Commit b9edad6

Browse files
committed
MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting for maps. Contributed by Siqi Li
1 parent 128ace1 commit b9edad6

File tree

5 files changed

+257
-14
lines changed

5 files changed

+257
-14
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,9 @@ Release 2.6.0 - UNRELEASED
429429
MAPREDUCE-6115. TestPipeApplication#testSubmitter fails in trunk (Binglin
430430
Chang via jlowe)
431431

432+
MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
433+
for maps (Siqi Li via jlowe)
434+
432435
Release 2.5.1 - 2014-09-05
433436

434437
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
544544
retryStartTime = 0;
545545

546546
scheduler.copySucceeded(mapId, host, compressedLength,
547-
endTime - startTime, mapOutput);
547+
startTime, endTime, mapOutput);
548548
// Note successful shuffle
549549
remaining.remove(mapId);
550550
metrics.successFetch();

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
162162
}
163163
}
164164

165-
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0,
165+
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
166166
mapOutput);
167167
return true; // successful fetch.
168168
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java

Lines changed: 95 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.UnknownHostException;
2424
import java.text.DecimalFormat;
2525
import java.util.ArrayList;
26+
import java.util.Collections;
2627
import java.util.HashMap;
2728
import java.util.HashSet;
2829
import java.util.Iterator;
@@ -64,7 +65,8 @@ protected Long initialValue() {
6465
private static final long INITIAL_PENALTY = 10000;
6566
private static final float PENALTY_GROWTH_RATE = 1.3f;
6667
private final static int REPORT_FAILURE_LIMIT = 10;
67-
68+
private static final float BYTES_PER_MILLIS_TO_MBS = 1000f / 1024 / 1024;
69+
6870
private final boolean[] finishedMaps;
6971

7072
private final int totalMaps;
@@ -92,6 +94,8 @@ protected Long initialValue() {
9294
private final long startTime;
9395
private long lastProgressTime;
9496

97+
private final CopyTimeTracker copyTimeTracker;
98+
9599
private volatile int maxMapRuntime = 0;
96100
private final int maxFailedUniqueFetches;
97101
private final int maxFetchFailuresBeforeReporting;
@@ -112,7 +116,7 @@ public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
112116
Counters.Counter failedShuffleCounter) {
113117
totalMaps = job.getNumMapTasks();
114118
abortFailureLimit = Math.max(30, totalMaps / 10);
115-
119+
copyTimeTracker = new CopyTimeTracker();
116120
remainingMaps = totalMaps;
117121
finishedMaps = new boolean[remainingMaps];
118122
this.reporter = reporter;
@@ -180,7 +184,8 @@ static URI getBaseURI(TaskAttemptID reduceId, String url) {
180184
public synchronized void copySucceeded(TaskAttemptID mapId,
181185
MapHost host,
182186
long bytes,
183-
long millis,
187+
long startMillis,
188+
long endMillis,
184189
MapOutput<K,V> output
185190
) throws IOException {
186191
failureCounts.remove(mapId);
@@ -195,29 +200,48 @@ public synchronized void copySucceeded(TaskAttemptID mapId,
195200
notifyAll();
196201
}
197202

198-
// update the status
203+
// update single copy task status
204+
long copyMillis = (endMillis - startMillis);
205+
if (copyMillis == 0) copyMillis = 1;
206+
float bytesPerMillis = (float) bytes / copyMillis;
207+
float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
208+
String individualProgress = "copy task(" + mapId + " succeeded"
209+
+ " at " + mbpsFormat.format(transferRate) + " MB/s)";
210+
// update the aggregated status
211+
copyTimeTracker.add(startMillis, endMillis);
212+
199213
totalBytesShuffledTillNow += bytes;
200-
updateStatus();
214+
updateStatus(individualProgress);
201215
reduceShuffleBytes.increment(bytes);
202216
lastProgressTime = Time.monotonicNow();
203217
LOG.debug("map " + mapId + " done " + status.getStateString());
204218
}
205219
}
206220

207-
private void updateStatus() {
208-
float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
221+
private synchronized void updateStatus(String individualProgress) {
209222
int mapsDone = totalMaps - remainingMaps;
210-
long secsSinceStart = (Time.monotonicNow() - startTime) / 1000 + 1;
211-
212-
float transferRate = mbs / secsSinceStart;
223+
long totalCopyMillis = copyTimeTracker.getCopyMillis();
224+
if (totalCopyMillis == 0) totalCopyMillis = 1;
225+
float bytesPerMillis = (float) totalBytesShuffledTillNow / totalCopyMillis;
226+
float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
213227
progress.set((float) mapsDone / totalMaps);
214228
String statusString = mapsDone + " / " + totalMaps + " copied.";
215229
status.setStateString(statusString);
216230

217-
progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
218-
+ mbpsFormat.format(transferRate) + " MB/s)");
231+
if (individualProgress != null) {
232+
progress.setStatus(individualProgress + " Aggregated copy rate(" +
233+
mapsDone + " of " + totalMaps + " at " +
234+
mbpsFormat.format(transferRate) + " MB/s)");
235+
} else {
236+
progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
237+
+ mbpsFormat.format(transferRate) + " MB/s)");
238+
}
219239
}
220240

241+
private void updateStatus() {
242+
updateStatus(null);
243+
}
244+
221245
public synchronized void hostFailed(String hostname) {
222246
if (hostFailures.containsKey(hostname)) {
223247
IntWritable x = hostFailures.get(hostname);
@@ -520,4 +544,63 @@ public void close() throws InterruptedException {
520544
public int getMaxHostFailures() {
521545
return maxHostFailures;
522546
}
547+
548+
private static class CopyTimeTracker {
549+
List<Interval> intervals;
550+
long copyMillis;
551+
public CopyTimeTracker() {
552+
intervals = Collections.emptyList();
553+
copyMillis = 0;
554+
}
555+
public void add(long s, long e) {
556+
Interval interval = new Interval(s, e);
557+
copyMillis = getTotalCopyMillis(interval);
558+
}
559+
560+
public long getCopyMillis() {
561+
return copyMillis;
562+
}
563+
// This method captures the time during which any copy was in progress
564+
// each copy time period is record in the Interval list
565+
private long getTotalCopyMillis(Interval newInterval) {
566+
if (newInterval == null) {
567+
return copyMillis;
568+
}
569+
List<Interval> result = new ArrayList<Interval>(intervals.size() + 1);
570+
for (Interval interval: intervals) {
571+
if (interval.end < newInterval.start) {
572+
result.add(interval);
573+
} else if (interval.start > newInterval.end) {
574+
result.add(newInterval);
575+
newInterval = interval;
576+
} else {
577+
newInterval = new Interval(
578+
Math.min(interval.start, newInterval.start),
579+
Math.max(newInterval.end, interval.end));
580+
}
581+
}
582+
result.add(newInterval);
583+
intervals = result;
584+
585+
//compute total millis
586+
long length = 0;
587+
for (Interval interval : intervals) {
588+
length += interval.getIntervalLength();
589+
}
590+
return length;
591+
}
592+
593+
private static class Interval {
594+
final long start;
595+
final long end;
596+
public Interval(long s, long e) {
597+
start = s;
598+
end = e;
599+
}
600+
601+
public long getIntervalLength() {
602+
return end - start;
603+
}
604+
}
605+
}
523606
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,20 @@
1717
*/
1818
package org.apache.hadoop.mapreduce.task.reduce;
1919

20+
import static org.mockito.Mockito.mock;
21+
import org.apache.hadoop.fs.FileSystem;
22+
import org.apache.hadoop.fs.LocalDirAllocator;
23+
import org.apache.hadoop.io.compress.CompressionCodec;
2024
import org.apache.hadoop.mapred.JobConf;
25+
import org.apache.hadoop.mapred.MapOutputFile;
26+
import org.apache.hadoop.mapred.Reporter;
27+
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
28+
import org.apache.hadoop.mapred.Task;
2129
import org.apache.hadoop.mapred.TaskAttemptID;
2230
import org.apache.hadoop.mapred.TaskStatus;
31+
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
32+
import org.apache.hadoop.mapred.Counters.Counter;
33+
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
2334
import org.apache.hadoop.mapreduce.JobID;
2435
import org.apache.hadoop.mapreduce.TaskID;
2536
import org.apache.hadoop.mapreduce.TaskType;
@@ -66,4 +77,150 @@ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
6677
0.0f);
6778
Assert.assertTrue(scheduler.waitUntilDone(1));
6879
}
80+
81+
@SuppressWarnings("rawtypes")
82+
@Test
83+
public <K, V> void TestAggregatedTransferRate() throws Exception {
84+
JobConf job = new JobConf();
85+
job.setNumMapTasks(10);
86+
//mock creation
87+
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
88+
Reporter mockReporter = mock(Reporter.class);
89+
FileSystem mockFileSystem = mock(FileSystem.class);
90+
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
91+
@SuppressWarnings("unchecked") // needed for mock with generic
92+
CombineOutputCollector<K, V> mockCombineOutputCollector =
93+
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
94+
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
95+
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
96+
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
97+
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
98+
Counter mockCounter = mock(Counter.class);
99+
TaskStatus mockTaskStatus = mock(TaskStatus.class);
100+
Progress mockProgress = mock(Progress.class);
101+
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
102+
Task mockTask = mock(Task.class);
103+
@SuppressWarnings("unchecked")
104+
MapOutput<K, V> output = mock(MapOutput.class);
105+
106+
ShuffleConsumerPlugin.Context<K, V> context =
107+
new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, job, mockFileSystem,
108+
mockUmbilical, mockLocalDirAllocator,
109+
mockReporter, mockCompressionCodec,
110+
combinerClass, mockCombineOutputCollector,
111+
mockCounter, mockCounter, mockCounter,
112+
mockCounter, mockCounter, mockCounter,
113+
mockTaskStatus, mockProgress, mockProgress,
114+
mockTask, mockMapOutputFile, null);
115+
TaskStatus status = new TaskStatus() {
116+
@Override
117+
public boolean getIsMap() {
118+
return false;
119+
}
120+
@Override
121+
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
122+
}
123+
};
124+
Progress progress = new Progress();
125+
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, status, null,
126+
null, progress, context.getShuffledMapsCounter(),
127+
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
128+
TaskAttemptID attemptID0 = new TaskAttemptID(
129+
new org.apache.hadoop.mapred.TaskID(
130+
new JobID("test",0), TaskType.MAP, 0), 0);
131+
132+
//adding the 1st interval, 40MB from 60s to 100s
133+
long bytes = (long)40 * 1024 * 1024;
134+
scheduler.copySucceeded(attemptID0, new MapHost(null, null), bytes, 60000, 100000, output);
135+
Assert.assertEquals("copy task(attempt_test_0000_m_000000_0 succeeded at 1.00 MB/s)"
136+
+ " Aggregated copy rate(1 of 10 at 1.00 MB/s)", progress.toString());
137+
138+
TaskAttemptID attemptID1 = new TaskAttemptID(
139+
new org.apache.hadoop.mapred.TaskID(
140+
new JobID("test",0), TaskType.MAP, 1), 1);
141+
142+
//adding the 2nd interval before the 1st interval, 50MB from 0s to 50s
143+
bytes = (long)50 * 1024 * 1024;
144+
scheduler.copySucceeded(attemptID1, new MapHost(null, null), bytes, 0, 50000, output);
145+
Assert.assertEquals("copy task(attempt_test_0000_m_000001_1 succeeded at 1.00 MB/s)"
146+
+ " Aggregated copy rate(2 of 10 at 1.00 MB/s)", progress.toString());
147+
148+
TaskAttemptID attemptID2 = new TaskAttemptID(
149+
new org.apache.hadoop.mapred.TaskID(
150+
new JobID("test",0), TaskType.MAP, 2), 2);
151+
152+
//adding the 3rd interval overlapping with the 1st and the 2nd interval
153+
//110MB from 25s to 80s
154+
bytes = (long)110 * 1024 * 1024;
155+
scheduler.copySucceeded(attemptID2, new MapHost(null, null), bytes, 25000, 80000, output);
156+
Assert.assertEquals("copy task(attempt_test_0000_m_000002_2 succeeded at 2.00 MB/s)"
157+
+ " Aggregated copy rate(3 of 10 at 2.00 MB/s)", progress.toString());
158+
159+
TaskAttemptID attemptID3 = new TaskAttemptID(
160+
new org.apache.hadoop.mapred.TaskID(
161+
new JobID("test",0), TaskType.MAP, 3), 3);
162+
163+
//adding the 4th interval just after the 2nd interval, 100MB from 100s to 300s
164+
bytes = (long)100 * 1024 * 1024;
165+
scheduler.copySucceeded(attemptID3, new MapHost(null, null), bytes, 100000, 300000, output);
166+
Assert.assertEquals("copy task(attempt_test_0000_m_000003_3 succeeded at 0.50 MB/s)"
167+
+ " Aggregated copy rate(4 of 10 at 1.00 MB/s)", progress.toString());
168+
169+
TaskAttemptID attemptID4 = new TaskAttemptID(
170+
new org.apache.hadoop.mapred.TaskID(
171+
new JobID("test",0), TaskType.MAP, 4), 4);
172+
173+
//adding the 5th interval between after 4th, 50MB from 350s to 400s
174+
bytes = (long)50 * 1024 * 1024;
175+
scheduler.copySucceeded(attemptID4, new MapHost(null, null), bytes, 350000, 400000, output);
176+
Assert.assertEquals("copy task(attempt_test_0000_m_000004_4 succeeded at 1.00 MB/s)"
177+
+ " Aggregated copy rate(5 of 10 at 1.00 MB/s)", progress.toString());
178+
179+
180+
TaskAttemptID attemptID5 = new TaskAttemptID(
181+
new org.apache.hadoop.mapred.TaskID(
182+
new JobID("test",0), TaskType.MAP, 5), 5);
183+
//adding the 6th interval between after 5th, 50MB from 450s to 500s
184+
bytes = (long)50 * 1024 * 1024;
185+
scheduler.copySucceeded(attemptID5, new MapHost(null, null), bytes, 450000, 500000, output);
186+
Assert.assertEquals("copy task(attempt_test_0000_m_000005_5 succeeded at 1.00 MB/s)"
187+
+ " Aggregated copy rate(6 of 10 at 1.00 MB/s)", progress.toString());
188+
189+
TaskAttemptID attemptID6 = new TaskAttemptID(
190+
new org.apache.hadoop.mapred.TaskID(
191+
new JobID("test",0), TaskType.MAP, 6), 6);
192+
//adding the 7th interval between after 5th and 6th interval, 20MB from 320s to 340s
193+
bytes = (long)20 * 1024 * 1024;
194+
scheduler.copySucceeded(attemptID6, new MapHost(null, null), bytes, 320000, 340000, output);
195+
Assert.assertEquals("copy task(attempt_test_0000_m_000006_6 succeeded at 1.00 MB/s)"
196+
+ " Aggregated copy rate(7 of 10 at 1.00 MB/s)", progress.toString());
197+
198+
TaskAttemptID attemptID7 = new TaskAttemptID(
199+
new org.apache.hadoop.mapred.TaskID(
200+
new JobID("test",0), TaskType.MAP, 7), 7);
201+
//adding the 8th interval overlapping with 4th, 5th, and 7th 30MB from 290s to 350s
202+
bytes = (long)30 * 1024 * 1024;
203+
scheduler.copySucceeded(attemptID7, new MapHost(null, null), bytes, 290000, 350000, output);
204+
Assert.assertEquals("copy task(attempt_test_0000_m_000007_7 succeeded at 0.50 MB/s)"
205+
+ " Aggregated copy rate(8 of 10 at 1.00 MB/s)", progress.toString());
206+
207+
TaskAttemptID attemptID8 = new TaskAttemptID(
208+
new org.apache.hadoop.mapred.TaskID(
209+
new JobID("test",0), TaskType.MAP, 8), 8);
210+
//adding the 9th interval overlapping with 5th and 6th, 50MB from 400s to 450s
211+
bytes = (long)50 * 1024 * 1024;
212+
scheduler.copySucceeded(attemptID8, new MapHost(null, null), bytes, 400000, 450000, output);
213+
Assert.assertEquals("copy task(attempt_test_0000_m_000008_8 succeeded at 1.00 MB/s)"
214+
+ " Aggregated copy rate(9 of 10 at 1.00 MB/s)", progress.toString());
215+
216+
TaskAttemptID attemptID9 = new TaskAttemptID(
217+
new org.apache.hadoop.mapred.TaskID(
218+
new JobID("test",0), TaskType.MAP, 9), 9);
219+
//adding the 10th interval overlapping with all intervals, 500MB from 0s to 500s
220+
bytes = (long)500 * 1024 * 1024;
221+
scheduler.copySucceeded(attemptID9, new MapHost(null, null), bytes, 0, 500000, output);
222+
Assert.assertEquals("copy task(attempt_test_0000_m_000009_9 succeeded at 1.00 MB/s)"
223+
+ " Aggregated copy rate(10 of 10 at 2.00 MB/s)", progress.toString());
224+
225+
}
69226
}

0 commit comments

Comments
 (0)