Skip to content

Commit 205d342

Browse files
committed
HDFS-17582. Distcp support fastcopy
1 parent 3af0f21 commit 205d342

File tree

15 files changed

+334
-32
lines changed

15 files changed

+334
-32
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ public class FastCopy {
101101

102102
private final DFSClient srcDFSClient;
103103
private final DistributedFileSystem dstFs;
104+
private long chunkOffset = 0;
105+
private long chunkLength = Long.MAX_VALUE;
106+
104107

105108
public FastCopy(Configuration conf, Path sourcePath, Path dstPath, boolean overwrite) throws IOException {
106109
this.conf = conf;
@@ -134,6 +137,12 @@ public FastCopy(Configuration conf, Path sourcePath, Path dstPath, boolean overw
134137
this.copyBlockExecutor = HadoopExecutors.newFixedThreadPool(this.copyBlockExecutorPoolSize);
135138
}
136139

140+
public FastCopy(Configuration conf, Path sourcePath, Path dstPath, boolean overwrite, long chunkOffset, long chunkLength) throws IOException {
141+
this(conf,sourcePath,dstPath,overwrite);
142+
this.chunkOffset = chunkOffset;
143+
this.chunkLength = chunkLength;
144+
}
145+
137146
private class CopyBlockCrossNamespace implements Runnable {
138147
private final ExtendedBlock source;
139148
private final ExtendedBlock target;
@@ -466,7 +475,7 @@ public void copyFile() throws IOException {
466475

467476
LOG.info("Start to copy {} to {}.", src, dst);
468477
try {
469-
LocatedBlocks blocks = srcDFSClient.getLocatedBlocks(src, 0, srcFileStatus.getLen());
478+
LocatedBlocks blocks = srcDFSClient.getLocatedBlocks(src, chunkOffset, chunkLength);
470479
List<LocatedBlock> blocksList = blocks.getLocatedBlocks();
471480
LOG.debug("FastCopy : Block locations retrieved for {} : {}.", src, blocksList);
472481

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ private DistCpConstants() {
6565
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
6666
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
6767
public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
68+
public static final String CONF_LABEL_USE_FAST_COPY = "distcp.use.fast.copy";
6869
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
6970
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
7071
public static final String CONF_LABEL_APPEND = "distcp.copy.append";

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ public boolean shouldSkipCRC() {
9999
return options.shouldSkipCRC();
100100
}
101101

102+
public boolean shouldUseFastCopy() {
103+
return options.shouldSkipCRC();
104+
}
105+
102106
public boolean shouldBlock() {
103107
return options.shouldBlock();
104108
}

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ public enum DistCpOptionSwitch {
153153
new Option("strategy", true, "Copy strategy to use. Default is " +
154154
"dividing work based on file sizes")),
155155

156+
/**
157+
* Copy file uses fastCopy.
158+
*/
159+
USE_FASTCOPY(DistCpConstants.CONF_LABEL_USE_FAST_COPY,
160+
new Option("fastcopy", false, "Copy file uses fastCopy.")),
161+
156162
/**
157163
* Skip CRC checks between source and target, when determining what
158164
* files need to be copied.

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public final class DistCpOptions {
161161
private final boolean directWrite;
162162

163163
private final boolean useIterator;
164+
private final boolean useFastCopy;
164165

165166
private final boolean updateRoot;
166167

@@ -230,6 +231,7 @@ private DistCpOptions(Builder builder) {
230231
this.directWrite = builder.directWrite;
231232

232233
this.useIterator = builder.useIterator;
234+
this.useFastCopy = builder.useFastCopy;
233235

234236
this.updateRoot = builder.updateRoot;
235237
}
@@ -290,6 +292,10 @@ public boolean shouldSkipCRC() {
290292
return skipCRC;
291293
}
292294

295+
public boolean shouldUseFastCopy() {
296+
return useFastCopy;
297+
}
298+
293299
public boolean shouldBlock() {
294300
return blocking;
295301
}
@@ -406,6 +412,8 @@ public void appendToConf(Configuration conf) {
406412
String.valueOf(useRdiff));
407413
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
408414
String.valueOf(skipCRC));
415+
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_FASTCOPY,
416+
String.valueOf(useFastCopy));
409417
if (mapBandwidth > 0) {
410418
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
411419
String.valueOf(mapBandwidth));
@@ -532,6 +540,8 @@ public static class Builder {
532540

533541
private boolean updateRoot = false;
534542

543+
private boolean useFastCopy = false;
544+
535545
public Builder(List<Path> sourcePaths, Path targetPath) {
536546
Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
537547
"Source paths should not be null or empty!");
@@ -816,6 +826,11 @@ public Builder withUpdateRoot(boolean updateRootAttrs) {
816826
this.updateRoot = updateRootAttrs;
817827
return this;
818828
}
829+
830+
public Builder withUseFastCopy(boolean useFastCopy) {
831+
this.useFastCopy = useFastCopy;
832+
return this;
833+
}
819834
}
820835

821836
}

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ public static DistCpOptions parse(String[] args)
118118
command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()))
119119
.withUseIterator(
120120
command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch()))
121+
.withUseFastCopy(
122+
command.hasOption(DistCpOptionSwitch.USE_FASTCOPY.getSwitch()))
121123
.withUpdateRoot(
122124
command.hasOption(DistCpOptionSwitch.UPDATE_ROOT.getSwitch()));
123125

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public enum ChecksumComparison {
9797
private boolean verboseLog = false;
9898
private boolean directWrite = false;
9999
private boolean useModTimeToUpdate;
100+
private boolean useFastCopy = false;
100101
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
101102

102103
private FileSystem targetFS = null;
@@ -129,6 +130,7 @@ public void setup(Context context) throws IOException, InterruptedException {
129130
useModTimeToUpdate =
130131
conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME,
131132
CONF_LABEL_UPDATE_MOD_TIME_DEFAULT);
133+
useFastCopy = conf.getBoolean(DistCpOptionSwitch.USE_FASTCOPY.getConfigLabel(), false);
132134

133135
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
134136
Path targetFinalPath = new Path(conf.get(
@@ -272,9 +274,14 @@ private void copyFileWithRetry(String description,
272274
throws IOException, InterruptedException {
273275
long bytesCopied;
274276
try {
275-
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
276-
action, directWrite).execute(sourceFileStatus, target, context,
277-
fileAttributes, sourceStatus);
277+
if (!useFastCopy) {
278+
bytesCopied =
279+
(Long) new RetriableFileCopyCommand(skipCrc, description, action, directWrite).execute(
280+
sourceFileStatus, target, context, fileAttributes, sourceStatus);
281+
} else {
282+
bytesCopied = (Long) new RetriableFileFastCopyCommand(skipCrc, description, action,
283+
directWrite).execute(sourceFileStatus, target, context, fileAttributes, sourceStatus);
284+
}
278285
} catch (Exception e) {
279286
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
280287
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
6666
private static Logger LOG = LoggerFactory.getLogger(RetriableFileCopyCommand.class);
6767
private boolean skipCrc = false;
6868
private boolean directWrite = false;
69-
private FileAction action;
69+
protected FileAction action;
7070

7171
/**
7272
* Constructor, taking a description of the action.
@@ -192,7 +192,7 @@ private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
192192
}
193193

194194
@SuppressWarnings("checkstyle:parameternumber")
195-
private long copyToFile(Path targetPath, FileSystem targetFS,
195+
protected long copyToFile(Path targetPath, FileSystem targetFS,
196196
CopyListingFileStatus source, long sourceOffset, Mapper.Context context,
197197
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum,
198198
FileStatus sourceStatus)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.tools.mapred;
20+
21+
import java.io.IOException;
22+
import java.util.EnumSet;
23+
24+
import org.apache.hadoop.fs.FileChecksum;
25+
import org.apache.hadoop.fs.FileStatus;
26+
import org.apache.hadoop.fs.FileSystem;
27+
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.hdfs.FastCopy;
29+
import org.apache.hadoop.mapreduce.Mapper.Context;
30+
import org.apache.hadoop.tools.CopyListingFileStatus;
31+
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
32+
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
33+
34+
public class RetriableFileFastCopyCommand extends RetriableFileCopyCommand {
35+
public RetriableFileFastCopyCommand(String description, FileAction action) {
36+
super(description, action);
37+
}
38+
39+
public RetriableFileFastCopyCommand(boolean skipCrc, String description, FileAction action) {
40+
super(skipCrc, description, action);
41+
}
42+
43+
public RetriableFileFastCopyCommand(boolean skipCrc, String description, FileAction action,
44+
boolean directWrite) {
45+
super(skipCrc, description, action, directWrite);
46+
}
47+
48+
@Override
49+
protected long copyToFile(Path targetPath, FileSystem targetFS, CopyListingFileStatus source,
50+
long sourceOffset, Context context, EnumSet<FileAttribute> fileAttributes,
51+
FileChecksum sourceChecksum, FileStatus sourceStatus) throws IOException {
52+
FastCopy fastCopy = new FastCopy(context.getConfiguration(), source.getPath(), targetPath,
53+
action == FileAction.OVERWRITE || action == FileAction.APPEND, source.getChunkOffset(),
54+
source.getChunkLength());
55+
fastCopy.copyFile();
56+
57+
if (action == FileAction.APPEND) {
58+
return source.getLen() - sourceOffset;
59+
}
60+
return source.getSizeToCopy();
61+
}
62+
}

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,18 @@ public void testSetSkipCRC() {
134134
Assert.assertTrue(options.shouldSkipCRC());
135135
}
136136

137+
@Test
138+
public void testUseFastCopy() {
139+
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
140+
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
141+
new Path("hdfs://localhost:8020/target/"));
142+
Assert.assertFalse(builder.build().shouldUseFastCopy());
143+
144+
final DistCpOptions options = builder.withUseFastCopy(true)
145+
.build();
146+
Assert.assertTrue(options.shouldUseFastCopy());
147+
}
148+
137149
@Test
138150
public void testSetAtomicCommit() {
139151
final DistCpOptions.Builder builder = new DistCpOptions.Builder(

0 commit comments

Comments
 (0)