Skip to content

Commit 062c50d

Browse files
authored
MAPREDUCE-7370. Parallelize MultipleOutputs#close call (#4248). Contributed by Ashutosh Gupta.
Reviewed-by: Akira Ajisaka <aajisaka@apache.org> Signed-off-by: Chris Nauroth <cnauroth@apache.org>
1 parent 8336b91 commit 062c50d

File tree

5 files changed

+165
-8
lines changed

5 files changed

+165
-8
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,39 @@
1717
*/
1818
package org.apache.hadoop.mapred.lib;
1919

20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.Iterator;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.StringTokenizer;
30+
import java.util.concurrent.Callable;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ThreadFactory;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
36+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
2040
import org.apache.hadoop.classification.InterfaceAudience;
2141
import org.apache.hadoop.classification.InterfaceStability;
42+
import org.apache.hadoop.classification.VisibleForTesting;
2243
import org.apache.hadoop.fs.FileSystem;
23-
import org.apache.hadoop.mapred.*;
44+
import org.apache.hadoop.mapred.FileOutputFormat;
45+
import org.apache.hadoop.mapred.JobConf;
46+
import org.apache.hadoop.mapred.OutputCollector;
47+
import org.apache.hadoop.mapred.OutputFormat;
48+
import org.apache.hadoop.mapred.RecordWriter;
49+
import org.apache.hadoop.mapred.Reporter;
50+
import org.apache.hadoop.mapreduce.MRConfig;
2451
import org.apache.hadoop.util.Progressable;
2552

26-
import java.io.IOException;
27-
import java.util.*;
28-
2953
/**
3054
* The MultipleOutputs class simplifies writing to additional outputs other
3155
* than the job default output via the <code>OutputCollector</code> passed to
@@ -132,6 +156,7 @@ public class MultipleOutputs {
132156
* Counters group used by the counters of MultipleOutputs.
133157
*/
134158
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
159+
private static final Logger LOG = LoggerFactory.getLogger(MultipleOutputs.class);
135160

136161
/**
137162
* Checks if a named output is alreadyDefined or not.
@@ -381,6 +406,11 @@ public static boolean getCountersEnabled(JobConf conf) {
381406
private Map<String, RecordWriter> recordWriters;
382407
private boolean countersEnabled;
383408

409+
@VisibleForTesting
410+
synchronized void setRecordWriters(Map<String, RecordWriter> recordWriters) {
411+
this.recordWriters = recordWriters;
412+
}
413+
384414
/**
385415
* Creates and initializes multiple named outputs support, it should be
386416
* instantiated in the Mapper/Reducer configure method.
@@ -528,8 +558,41 @@ public void collect(Object key, Object value) throws IOException {
528558
* could not be closed properly.
529559
*/
530560
public void close() throws IOException {
561+
int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
562+
MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
563+
AtomicBoolean encounteredException = new AtomicBoolean(false);
564+
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
565+
.setUncaughtExceptionHandler(((t, e) -> {
566+
LOG.error("Thread " + t + " failed unexpectedly", e);
567+
encounteredException.set(true);
568+
})).build();
569+
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
570+
571+
List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
572+
531573
for (RecordWriter writer : recordWriters.values()) {
532-
writer.close(null);
574+
callableList.add(() -> {
575+
try {
576+
writer.close(null);
577+
} catch (IOException e) {
578+
LOG.error("Error while closing MultipleOutput file", e);
579+
encounteredException.set(true);
580+
}
581+
return null;
582+
});
583+
}
584+
try {
585+
executorService.invokeAll(callableList);
586+
} catch (InterruptedException e) {
587+
LOG.warn("Closing is Interrupted");
588+
Thread.currentThread().interrupt();
589+
} finally {
590+
executorService.shutdown();
591+
}
592+
593+
if (encounteredException.get()) {
594+
throw new IOException(
595+
"One or more threads encountered exception during close. See prior errors.");
533596
}
534597
}
535598

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,7 @@ public interface MRConfig {
131131
String MASTER_WEBAPP_UI_ACTIONS_ENABLED =
132132
"mapreduce.webapp.ui-actions.enabled";
133133
boolean DEFAULT_MASTER_WEBAPP_UI_ACTIONS_ENABLED = true;
134+
String MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = "mapreduce.multiple-outputs-close-threads";
135+
int DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = 10;
134136
}
135137

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@
1919

2020
import org.apache.hadoop.classification.InterfaceAudience;
2121
import org.apache.hadoop.classification.InterfaceStability;
22+
import org.apache.hadoop.classification.VisibleForTesting;
2223
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.mapreduce.*;
24-
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
2525
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
2626
import org.apache.hadoop.util.ReflectionUtils;
2727

2828
import java.io.IOException;
2929
import java.util.*;
30+
import java.util.concurrent.Callable;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ThreadFactory;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
36+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
3039

3140
/**
3241
* The MultipleOutputs class simplifies writing output data
@@ -191,6 +200,8 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
191200
* Counters group used by the counters of MultipleOutputs.
192201
*/
193202
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
203+
private static final Logger LOG =
204+
LoggerFactory.getLogger(org.apache.hadoop.mapred.lib.MultipleOutputs.class);
194205

195206
/**
196207
* Cache for the taskContexts
@@ -345,6 +356,11 @@ public static boolean getCountersEnabled(JobContext job) {
345356
return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
346357
}
347358

359+
@VisibleForTesting
360+
synchronized void setRecordWriters(Map<String, RecordWriter<?, ?>> recordWriters) {
361+
this.recordWriters = recordWriters;
362+
}
363+
348364
/**
349365
* Wraps RecordWriter to increment counters.
350366
*/
@@ -568,8 +584,43 @@ public void setStatus(String status) {
568584
*/
569585
@SuppressWarnings("unchecked")
570586
public void close() throws IOException, InterruptedException {
587+
Configuration conf = context.getConfiguration();
588+
int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
589+
MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
590+
AtomicBoolean encounteredException = new AtomicBoolean(false);
591+
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
592+
.setUncaughtExceptionHandler(((t, e) -> {
593+
LOG.error("Thread " + t + " failed unexpectedly", e);
594+
encounteredException.set(true);
595+
})).build();
596+
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
597+
598+
List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
599+
571600
for (RecordWriter writer : recordWriters.values()) {
572-
writer.close(context);
601+
callableList.add(() -> {
602+
try {
603+
writer.close(context);
604+
} catch (IOException e) {
605+
LOG.error("Error while closing MultipleOutput file", e);
606+
encounteredException.set(true);
607+
}
608+
return null;
609+
});
610+
}
611+
try {
612+
executorService.invokeAll(callableList);
613+
} catch (InterruptedException e) {
614+
LOG.warn("Closing is Interrupted");
615+
Thread.currentThread().interrupt();
616+
} finally {
617+
executorService.shutdown();
618+
}
619+
620+
if (encounteredException.get()) {
621+
throw new IOException(
622+
"One or more threads encountered exception during close. See prior errors.");
573623
}
574624
}
575625
}
626+

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.mapred.JobConf;
3333
import org.apache.hadoop.mapred.Mapper;
3434
import org.apache.hadoop.mapred.OutputCollector;
35+
import org.apache.hadoop.mapred.RecordWriter;
3536
import org.apache.hadoop.mapred.Reducer;
3637
import org.apache.hadoop.mapred.Reporter;
3738
import org.apache.hadoop.mapred.RunningJob;
@@ -46,11 +47,16 @@
4647
import java.io.DataOutputStream;
4748
import java.io.IOException;
4849
import java.io.InputStreamReader;
50+
import java.util.Arrays;
4951
import java.util.Iterator;
52+
import java.util.Map;
5053

5154
import static org.junit.Assert.assertEquals;
5255
import static org.junit.Assert.assertFalse;
5356
import static org.junit.Assert.assertTrue;
57+
import static org.mockito.Mockito.doThrow;
58+
import static org.mockito.Mockito.mock;
59+
import static org.mockito.Mockito.when;
5460

5561
public class TestMultipleOutputs extends HadoopTestCase {
5662

@@ -70,6 +76,19 @@ public void testWithCounters() throws Exception {
7076
_testMOWithJavaSerialization(true);
7177
}
7278

79+
@SuppressWarnings("unchecked")
80+
@Test(expected = IOException.class)
81+
public void testParallelCloseIOException() throws IOException {
82+
RecordWriter writer = mock(RecordWriter.class);
83+
Map<String, RecordWriter> recordWriters = mock(Map.class);
84+
when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
85+
doThrow(new IOException("test IO exception")).when(writer).close(null);
86+
JobConf conf = createJobConf();
87+
MultipleOutputs mos = new MultipleOutputs(conf);
88+
mos.setRecordWriters(recordWriters);
89+
mos.close();
90+
}
91+
7392
private static final Path ROOT_DIR = new Path("testing/mo");
7493
private static final Path IN_DIR = new Path(ROOT_DIR, "input");
7594
private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
@@ -307,6 +326,7 @@ protected void _testMultipleOutputs(boolean withCounters) throws Exception {
307326

308327
}
309328

329+
310330
@SuppressWarnings({"unchecked"})
311331
public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
312332
Text> {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,25 @@
3131
import org.apache.hadoop.mapreduce.Job;
3232
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
3333
import org.apache.hadoop.mapreduce.Mapper;
34+
import org.apache.hadoop.mapreduce.RecordWriter;
3435
import org.apache.hadoop.mapreduce.Reducer;
36+
3537
import org.junit.After;
3638
import org.junit.Before;
3739
import org.junit.Test;
3840

3941
import java.io.BufferedReader;
4042
import java.io.IOException;
4143
import java.io.InputStreamReader;
44+
import java.util.Arrays;
45+
import java.util.Map;
4246

4347
import static org.junit.Assert.assertEquals;
4448
import static org.junit.Assert.assertFalse;
4549
import static org.junit.Assert.assertTrue;
50+
import static org.mockito.Mockito.doThrow;
51+
import static org.mockito.Mockito.mock;
52+
import static org.mockito.Mockito.when;
4653

4754
public class TestMRMultipleOutputs extends HadoopTestCase {
4855

@@ -62,6 +69,20 @@ public void testWithCounters() throws Exception {
6269
_testMOWithJavaSerialization(true);
6370
}
6471

72+
@SuppressWarnings("unchecked")
73+
@Test(expected = IOException.class)
74+
public void testParallelCloseIOException() throws IOException, InterruptedException {
75+
RecordWriter writer = mock(RecordWriter.class);
76+
Map recordWriters = mock(Map.class);
77+
when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
78+
Mapper.Context taskInputOutputContext = mock(Mapper.Context.class);
79+
when(taskInputOutputContext.getConfiguration()).thenReturn(createJobConf());
80+
doThrow(new IOException("test IO exception")).when(writer).close(taskInputOutputContext);
81+
MultipleOutputs<Long, String> mos = new MultipleOutputs<Long, String>(taskInputOutputContext);
82+
mos.setRecordWriters(recordWriters);
83+
mos.close();
84+
}
85+
6586
private static String localPathRoot =
6687
System.getProperty("test.build.data", "/tmp");
6788
private static final Path ROOT_DIR = new Path(localPathRoot, "testing/mo");
@@ -85,7 +106,7 @@ public void tearDown() throws Exception {
85106
fs.delete(ROOT_DIR, true);
86107
super.tearDown();
87108
}
88-
109+
89110
protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
90111
String input = "a\nb\nc\nd\ne\nc\nd\ne";
91112

0 commit comments

Comments
 (0)