|
26 | 26 | import java.io.InputStreamReader; |
27 | 27 | import java.io.PrintStream; |
28 | 28 | import java.text.SimpleDateFormat; |
| 29 | +import java.util.ArrayList; |
29 | 30 | import java.util.Date; |
30 | 31 | import java.util.Iterator; |
| 32 | +import java.util.List; |
31 | 33 | import java.util.StringTokenizer; |
| 34 | +import java.util.concurrent.Callable; |
| 35 | +import java.util.concurrent.ExecutionException; |
| 36 | +import java.util.concurrent.ExecutorService; |
| 37 | +import java.util.concurrent.Future; |
32 | 38 |
|
33 | 39 | import org.apache.hadoop.HadoopIllegalArgumentException; |
34 | 40 | import org.apache.hadoop.conf.Configuration; |
|
55 | 61 | import org.apache.hadoop.mapred.SequenceFileInputFormat; |
56 | 62 | import org.apache.hadoop.util.Tool; |
57 | 63 | import org.apache.hadoop.util.ToolRunner; |
| 64 | +import org.apache.hadoop.util.concurrent.HadoopExecutors; |
58 | 65 | import org.slf4j.Logger; |
59 | 66 | import org.slf4j.LoggerFactory; |
60 | 67 |
|
@@ -105,6 +112,8 @@ public class NNBench extends Configured implements Tool { |
105 | 112 | private static final String OP_RENAME = "rename"; |
106 | 113 | private static final String OP_DELETE = "delete"; |
107 | 114 | private static final int MAX_OPERATION_EXCEPTIONS = 1000; |
| 115 | + private ExecutorService executorService = |
| 116 | + HadoopExecutors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); |
108 | 117 |
|
109 | 118 | // To display in the format that matches the NN and DN log format |
110 | 119 | // Example: 2007-10-26 00:01:19,853 |
@@ -134,27 +143,60 @@ private void cleanupBeforeTestrun() throws IOException { |
134 | 143 | * |
135 | 144 | * @throws IOException on error |
136 | 145 | */ |
137 | | - private void createControlFiles() throws IOException { |
| 146 | + private void createControlFiles() throws ExecutionException, InterruptedException { |
138 | 147 | LOG.info("Creating " + numberOfMaps + " control files"); |
139 | 148 |
|
| 149 | + List<Future<Void>> list = new ArrayList<>(); |
140 | 150 | for (int i = 0; i < numberOfMaps; i++) { |
141 | 151 | String strFileName = "NNBench_Controlfile_" + i; |
142 | 152 | Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME), |
143 | 153 | strFileName); |
144 | 154 |
|
| 155 | + Future<Void> future = executorService.submit(new createControlFile(strFileName, filePath, i)); |
| 156 | + list.add(future); |
| 157 | + } |
| 158 | + |
| 159 | + for (int i = 0; i < list.size(); i++) { |
| 160 | + try { |
| 161 | + list.get(i).get(); |
| 162 | + } catch (InterruptedException | ExecutionException e) { |
| 163 | + LOG.error("Creating control files Error."); |
| 164 | + throw e; |
| 165 | + } |
| 166 | + } |
| 167 | + |
| 168 | + executorService.shutdown(); |
| 169 | + } |
| 170 | + |
| 171 | + private class createControlFile implements Callable<Void> { |
| 172 | + String strFileName; |
| 173 | + Path filePath; |
| 174 | + int order; |
| 175 | + |
| 176 | + public createControlFile(String strFileName, Path filePath, int order) { |
| 177 | + this.strFileName = strFileName; |
| 178 | + this.filePath = filePath; |
| 179 | + this.order = order; |
| 180 | + } |
| 181 | + |
| 182 | + @Override |
| 183 | + public Void call() throws Exception { |
145 | 184 | SequenceFile.Writer writer = null; |
146 | 185 | try { |
147 | 186 | writer = SequenceFile.createWriter(getConf(), Writer.file(filePath), |
148 | 187 | Writer.keyClass(Text.class), Writer.valueClass(LongWritable.class), |
149 | 188 | Writer.compression(CompressionType.NONE)); |
150 | | - writer.append(new Text(strFileName), new LongWritable(i)); |
| 189 | + writer.append(new Text(strFileName), new LongWritable(order)); |
151 | 190 | } finally { |
152 | 191 | if (writer != null) { |
153 | 192 | writer.close(); |
154 | 193 | } |
155 | 194 | } |
| 195 | + return null; |
156 | 196 | } |
| 197 | + |
157 | 198 | } |
| 199 | + |
158 | 200 | /** |
159 | 201 | * Display version |
160 | 202 | */ |
|
0 commit comments