Skip to content

Commit 4c94831

Browse files
jianghuazhuzhujianghua
andauthored
HDFS-16173.Improve CopyCommands#Put#executor queue configurability. (#3302)
Co-authored-by: zhujianghua <zhujianghua@zhujianghuadeMacBook-Pro.local> Reviewed-by: Hui Fei <ferhui@apache.org> Reviewed-by: Viraj Jasani <vjasani@apache.org>
1 parent aa9cdf2 commit 4c94831

File tree

4 files changed

+62
-10
lines changed

4 files changed

+62
-10
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.hadoop.fs.Path;
3939
import org.apache.hadoop.fs.PathIsDirectoryException;
4040
import org.apache.hadoop.io.IOUtils;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
4143

4244
/** Various commands for copy files */
4345
@InterfaceAudience.Private
@@ -239,15 +241,20 @@ protected void processOptions(LinkedList<String> args)
239241
* Copy local files to a remote filesystem
240242
*/
241243
public static class Put extends CommandWithDestination {
244+
245+
public static final Logger LOG = LoggerFactory.getLogger(Put.class);
246+
242247
private ThreadPoolExecutor executor = null;
248+
private int threadPoolQueueSize = 1024;
243249
private int numThreads = 1;
244250

245251
private static final int MAX_THREADS =
246252
Runtime.getRuntime().availableProcessors() * 2;
247253

248254
public static final String NAME = "put";
249255
public static final String USAGE =
250-
"[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
256+
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] " +
257+
"<localsrc> ... <dst>";
251258
public static final String DESCRIPTION =
252259
"Copy files from the local file system " +
253260
"into fs. Copying fails if the file already " +
@@ -256,6 +263,8 @@ public static class Put extends CommandWithDestination {
256263
" -p : Preserves timestamps, ownership and the mode.\n" +
257264
" -f : Overwrites the destination if it already exists.\n" +
258265
" -t <thread count> : Number of threads to be used, default is 1.\n" +
266+
" -q <threadPool size> : ThreadPool queue size to be used, " +
267+
"default is 1024.\n" +
259268
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
260269
" replication factor of 1. This flag will result in reduced" +
261270
" durability. Use with care.\n" +
@@ -266,8 +275,10 @@ protected void processOptions(LinkedList<String> args) throws IOException {
266275
CommandFormat cf =
267276
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
268277
cf.addOptionWithValue("t");
278+
cf.addOptionWithValue("q");
269279
cf.parse(args);
270280
setNumberThreads(cf.getOptValue("t"));
281+
setThreadPoolQueueSize(cf.getOptValue("q"));
271282
setOverwrite(cf.getOpt("f"));
272283
setPreserve(cf.getOpt("p"));
273284
setLazyPersist(cf.getOpt("l"));
@@ -299,7 +310,7 @@ protected void processArguments(LinkedList<PathData> args)
299310
}
300311

301312
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
302-
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
313+
TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize),
303314
new ThreadPoolExecutor.CallerRunsPolicy());
304315
super.processArguments(args);
305316

@@ -329,6 +340,25 @@ private void setNumberThreads(String numberThreadsString) {
329340
}
330341
}
331342

343+
private void setThreadPoolQueueSize(String numThreadPoolQueueSize) {
344+
if (numThreadPoolQueueSize != null) {
345+
int parsedValue = Integer.parseInt(numThreadPoolQueueSize);
346+
if (parsedValue < 1) {
347+
LOG.warn("The value of the thread pool queue size cannot be " +
348+
"less than 1, and the default value is used here. " +
349+
"The default size is 1024.");
350+
threadPoolQueueSize = 1024;
351+
} else {
352+
threadPoolQueueSize = parsedValue;
353+
}
354+
}
355+
}
356+
357+
@VisibleForTesting
358+
protected int getThreadPoolQueueSize() {
359+
return threadPoolQueueSize;
360+
}
361+
332362
private void copyFile(PathData src, PathData target) throws IOException {
333363
if (isPathRecursable(src)) {
334364
throw new PathIsDirectoryException(src.toString());

hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ Returns 0 on success and -1 on error.
525525
put
526526
---
527527

528-
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [ - | <localsrc1> .. ]. <dst>`
528+
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] [ - | <localsrc1> .. ]. <dst>`
529529

530530
Copy single src, or multiple srcs from local file system to the destination file system.
531531
Also reads input from stdin and writes to destination file system if the source is set to "-"
@@ -542,6 +542,7 @@ Options:
542542
* `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication
543543
factor of 1. This flag will result in reduced durability. Use with care.
544544
* `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
545+
* `-q <threadPool queue size>` : ThreadPool queue size to be used, default is 1024.
545546

546547

547548
Examples:
@@ -550,6 +551,7 @@ Examples:
550551
* `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir`
551552
* `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile`
552553
* `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin.
554+
* `hadoop fs -put -q 500 localfile3 hdfs://nn.example.com/hadoop/hadoopfile3`
553555

554556
Exit Code:
555557

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,22 @@ public void testPutWithoutP() throws Exception {
122122
assertAttributesChanged(TO);
123123
}
124124

125+
@Test(timeout = 10000)
126+
public void testPutWithPQ() throws Exception {
127+
Put put = new Put();
128+
run(put, "-p", "-q", "100", FROM.toString(), TO.toString());
129+
assertEquals(put.getThreadPoolQueueSize(), 100);
130+
assertAttributesPreserved(TO);
131+
}
132+
133+
@Test(timeout = 10000)
134+
public void testPutWithQ() throws Exception {
135+
Put put = new Put();
136+
run(put, "-q", "100", FROM.toString(), TO.toString());
137+
assertEquals(put.getThreadPoolQueueSize(), 100);
138+
assertAttributesChanged(TO);
139+
}
140+
125141
@Test(timeout = 10000)
126142
public void testPutWithSplCharacter() throws Exception {
127143
fs.mkdirs(DIR_FROM_SPL);

hadoop-common-project/hadoop-common/src/test/resources/testConf.xml

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@
498498
<type>RegexpComparator</type>
499499
<comparator>
500500
<type>RegexpComparator</type>
501-
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
501+
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
502502
</comparator>
503503
</comparator>
504504
<comparator>
@@ -515,19 +515,23 @@
515515
</comparator>
516516
<comparator>
517517
<type>RegexpComparator</type>
518-
<expected-output>^\s*-p Preserves timestamps, ownership and the mode.( )*</expected-output>
518+
<expected-output>^\s*-p Preserves timestamps, ownership and the mode.( )*</expected-output>
519519
</comparator>
520520
<comparator>
521521
<type>RegexpComparator</type>
522-
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
522+
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
523523
</comparator>
524524
<comparator>
525525
<type>RegexpComparator</type>
526-
<expected-output>^\s*-t &lt;thread count&gt; Number of threads to be used, default is 1.( )*</expected-output>
526+
<expected-output>^\s*-t &lt;thread count&gt; Number of threads to be used, default is 1.( )*</expected-output>
527527
</comparator>
528528
<comparator>
529529
<type>RegexpComparator</type>
530-
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
530+
<expected-output>^\s*-q &lt;threadPool size&gt; ThreadPool queue size to be used, default is 1024.( )*</expected-output>
531+
</comparator>
532+
<comparator>
533+
<type>RegexpComparator</type>
534+
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
531535
</comparator>
532536
<comparator>
533537
<type>RegexpComparator</type>
@@ -539,7 +543,7 @@
539543
</comparator>
540544
<comparator>
541545
<type>RegexpComparator</type>
542-
<expected-output>^\s*-d Skip creation of temporary file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
546+
<expected-output>^\s*-d Skip creation of temporary file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
543547
</comparator>
544548
</comparators>
545549
</test>
@@ -554,7 +558,7 @@
554558
<comparators>
555559
<comparator>
556560
<type>RegexpComparator</type>
557-
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
561+
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
558562
</comparator>
559563
<comparator>
560564
<type>RegexpComparator</type>

0 commit comments

Comments
 (0)