Skip to content

Commit 8ef07f7

Browse files
author
Amareshwari Sriramadasu
committed
HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes. Contributed by Raju Bairishetti
1 parent 1403b84 commit 8ef07f7

File tree

9 files changed

+40
-19
lines changed

9 files changed

+40
-19
lines changed

hadoop-common-project/hadoop-common/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ Trunk (Unreleased)
5353

5454
IMPROVEMENTS
5555

56+
HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes
57+
(Raju Bairishetti via amareshwari)
58+
5659
HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution
5760
not covered (Eric Charles via bobby)
5861

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class DistCpConstants {
3030
public static final int DEFAULT_MAPS = 20;
3131

3232
/* Default bandwidth if none specified */
33-
public static final int DEFAULT_BANDWIDTH_MB = 100;
33+
public static final float DEFAULT_BANDWIDTH_MB = 100;
3434

3535
/* Default strategy for copying. Implementation looked up
3636
from distcp-default.xml

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,11 @@ public enum DistCpOptionSwitch {
174174
"copied to <= n bytes")),
175175

176176
/**
177-
* Specify bandwidth per map in MB
177+
* Specify bandwidth per map in MB, accepts bandwidth as a fraction
178178
*/
179179
BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
180-
new Option("bandwidth", true, "Specify bandwidth per map in MB")),
180+
new Option("bandwidth", true, "Specify bandwidth per map in MB,"
181+
+ " accepts bandwidth as a fraction.")),
181182

182183
/**
183184
* Path containing a list of strings, which when found in the path of

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class DistCpOptions {
4747
public static final int maxNumListstatusThreads = 40;
4848
private int numListstatusThreads = 0; // Indicates that flag is not set.
4949
private int maxMaps = DistCpConstants.DEFAULT_MAPS;
50-
private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
50+
private float mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
5151

5252
private String sslConfigurationFile;
5353

@@ -366,7 +366,7 @@ public void setMaxMaps(int maxMaps) {
366366
*
367367
* @return Bandwidth in MB
368368
*/
369-
public int getMapBandwidth() {
369+
public float getMapBandwidth() {
370370
return mapBandwidth;
371371
}
372372

@@ -375,7 +375,7 @@ public int getMapBandwidth() {
375375
*
376376
* @param mapBandwidth - per map bandwidth
377377
*/
378-
public void setMapBandwidth(int mapBandwidth) {
378+
public void setMapBandwidth(float mapBandwidth) {
379379
assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
380380
this.mapBandwidth = mapBandwidth;
381381
}

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ private static void parseBandwidth(CommandLine command,
293293
DistCpOptions option) {
294294
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
295295
try {
296-
Integer mapBandwidth = Integer.parseInt(
296+
Float mapBandwidth = Float.parseFloat(
297297
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
298298
if (mapBandwidth <= 0) {
299299
throw new IllegalArgumentException("Bandwidth specified is not " +

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public static enum Counter {
6262
BYTESEXPECTED,// Number of bytes expected to be copied.
6363
BYTESFAILED, // Number of bytes that failed to be copied.
6464
BYTESSKIPPED, // Number of bytes that were skipped from copy.
65+
SLEEP_TIME_MS, // Time map slept while trying to honor bandwidth cap.
66+
BANDWIDTH_IN_BYTES, // Effective transfer rate in B/s.
6567
}
6668

6769
/**
@@ -85,7 +87,9 @@ static enum FileAction {
8587
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
8688

8789
private FileSystem targetFS = null;
88-
private Path targetWorkPath = null;
90+
private Path targetWorkPath = null;
91+
private long startEpoch;
92+
private long totalBytesCopied = 0;
8993

9094
/**
9195
* Implementation of the Mapper::setup() method. This extracts the DistCp-
@@ -118,6 +122,7 @@ public void setup(Context context) throws IOException, InterruptedException {
118122
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
119123
initializeSSLConf(context);
120124
}
125+
startEpoch = System.currentTimeMillis();
121126
}
122127

123128
/**
@@ -288,6 +293,7 @@ private void copyFileWithRetry(String description,
288293
incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
289294
incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
290295
incrementCounter(context, Counter.COPY, 1);
296+
totalBytesCopied += bytesCopied;
291297
}
292298

293299
private void createTargetDirsWithRetry(String description,
@@ -373,4 +379,13 @@ private boolean canSkip(FileSystem sourceFS, FileStatus source,
373379
return false;
374380
}
375381
}
382+
383+
@Override
384+
protected void cleanup(Context context)
385+
throws IOException, InterruptedException {
386+
super.cleanup(context);
387+
long secs = (System.currentTimeMillis() - startEpoch) / 1000;
388+
incrementCounter(context, Counter.BANDWIDTH_IN_BYTES,
389+
totalBytesCopied / ((secs == 0 ? 1 : secs)));
390+
}
376391
}

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ private static ThrottledInputStream getInputStream(Path path,
293293
Configuration conf) throws IOException {
294294
try {
295295
FileSystem fs = path.getFileSystem(conf);
296-
long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
296+
float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
297297
DistCpConstants.DEFAULT_BANDWIDTH_MB);
298298
FSDataInputStream in = fs.open(path);
299299
return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
public class ThrottledInputStream extends InputStream {
4040

4141
private final InputStream rawStream;
42-
private final long maxBytesPerSec;
42+
private final float maxBytesPerSec;
4343
private final long startTime = System.currentTimeMillis();
4444

4545
private long bytesRead = 0;
@@ -51,8 +51,8 @@ public ThrottledInputStream(InputStream rawStream) {
5151
this(rawStream, Long.MAX_VALUE);
5252
}
5353

54-
public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
55-
assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
54+
public ThrottledInputStream(InputStream rawStream, float maxBytesPerSec) {
55+
assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
5656
this.rawStream = rawStream;
5757
this.maxBytesPerSec = maxBytesPerSec;
5858
}

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
public class TestOptionsParser {
3434

35+
private static final float DELTA = 0.001f;
36+
3537
@Test
3638
public void testParseIgnoreFailure() {
3739
DistCpOptions options = OptionsParser.parse(new String[] {
@@ -104,14 +106,14 @@ public void testParsebandwidth() {
104106
DistCpOptions options = OptionsParser.parse(new String[] {
105107
"hdfs://localhost:8020/source/first",
106108
"hdfs://localhost:8020/target/"});
107-
Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB);
109+
Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
108110

109111
options = OptionsParser.parse(new String[] {
110112
"-bandwidth",
111-
"11",
113+
"11.2",
112114
"hdfs://localhost:8020/source/first",
113115
"hdfs://localhost:8020/target/"});
114-
Assert.assertEquals(options.getMapBandwidth(), 11);
116+
Assert.assertEquals(options.getMapBandwidth(), 11.2, DELTA);
115117
}
116118

117119
@Test(expected=IllegalArgumentException.class)
@@ -585,8 +587,8 @@ public void testOptionsAppendToConf() {
585587
options.appendToConf(conf);
586588
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
587589
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
588-
Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
589-
DistCpConstants.DEFAULT_BANDWIDTH_MB);
590+
Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
591+
DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
590592

591593
conf = new Configuration();
592594
Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
@@ -597,14 +599,14 @@ public void testOptionsAppendToConf() {
597599
"-delete",
598600
"-pu",
599601
"-bandwidth",
600-
"11",
602+
"11.2",
601603
"hdfs://localhost:8020/source/first",
602604
"hdfs://localhost:8020/target/"});
603605
options.appendToConf(conf);
604606
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
605607
Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
606608
Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
607-
Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
609+
Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11.2, DELTA);
608610
}
609611

610612
@Test

0 commit comments

Comments
 (0)