Skip to content

Commit 3af0f21

Browse files
committed
HDFS-17581. Add FastCopy tool and support dfs -fastcp command
1 parent 3e2e2c3 commit 3af0f21

File tree

11 files changed

+899
-1
lines changed

11 files changed

+899
-1
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2544,6 +2544,10 @@ public void moveFromLocalFile(Path src, Path dst)
25442544
copyFromLocalFile(true, src, dst);
25452545
}
25462546

2547+
public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException {
2548+
throw new UnsupportedOperationException("FileSystem does not support fastcopy method.");
2549+
}
2550+
25472551
/**
25482552
* The src file is on the local disk. Add it to the filesystem at
25492553
* the given dst name.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,14 @@ protected void copyFileToTarget(PathData src, PathData target)
365365
IOUtils.closeStream(in);
366366
}
367367
}
368+
369+
protected void copyFileToTargetWithFastCp(PathData src,PathData target) throws IOException {
370+
final boolean preserveRawXattrs =
371+
checkPathsForReservedRaw(src.path, target.path);
372+
src.fs.setVerifyChecksum(verifyChecksum);
373+
fastCopyToTarget(src, target);
374+
preserveAttributes(src, target, preserveRawXattrs);
375+
}
368376

369377
/**
370378
* Check the source and target paths to ensure that they are either both in
@@ -433,6 +441,24 @@ protected void copyStreamToTarget(InputStream in, PathData target)
433441
}
434442
}
435443

444+
protected void fastCopyToTarget(PathData src, PathData target)
445+
throws IOException {
446+
if (target.exists && (target.stat.isDirectory() || !overwrite)) {
447+
throw new PathExistsException(target.toString());
448+
}
449+
TargetFileSystem targetFs = new TargetFileSystem(target.fs);
450+
try {
451+
PathData tempTarget = direct ? target : target.suffix("._COPYING_");
452+
targetFs.setWriteChecksum(writeChecksum);
453+
src.fs.fastCopy(src.path, tempTarget.path, overwrite);
454+
if (!direct) {
455+
targetFs.rename(tempTarget, target);
456+
}
457+
} finally {
458+
targetFs.close(); // last ditch effort to ensure temp file is removed
459+
}
460+
}
461+
436462
/**
437463
* Preserve the attributes of the source to the target.
438464
* The method calls {@link #shouldPreserve(FileAttribute)} to check what

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.fs.FSDataOutputStream;
3535
import org.apache.hadoop.fs.Path;
3636
import org.apache.hadoop.fs.PathIsDirectoryException;
37+
import org.apache.hadoop.fs.PathOperationException;
3738
import org.apache.hadoop.io.IOUtils;
3839

3940
/** Various commands for copy files */
@@ -44,6 +45,7 @@ class CopyCommands {
4445
public static void registerCommands(CommandFactory factory) {
4546
factory.addClass(Merge.class, "-getmerge");
4647
factory.addClass(Cp.class, "-cp");
48+
factory.addClass(FastCp.class, "-fastcp");
4749
factory.addClass(CopyFromLocal.class, "-copyFromLocal");
4850
factory.addClass(CopyToLocal.class, "-copyToLocal");
4951
factory.addClass(Get.class, "-get");
@@ -209,7 +211,81 @@ private void popPreserveOption(List<String> args) {
209211
}
210212
}
211213
}
212-
214+
215+
static class FastCp extends CopyCommandWithMultiThread {
216+
public static final String NAME = "fastcp";
217+
public static final String USAGE =
218+
"[-f] [-p | -p[topax]] [-d] [-t <thread count>]"
219+
+ " [-q <thread pool queue size>] <src> ... <dst>";
220+
public static final String DESCRIPTION =
221+
"FastCopy files that match the file pattern <src> to a destination."
222+
+ " When copying multiple files, the destination must be a "
223+
+ "directory.\nFlags :\n"
224+
+ " -p[topax] : Preserve file attributes [topx] (timestamps, "
225+
+ "ownership, permission, ACL, XAttr). If -p is specified with "
226+
+ "no arg, then preserves timestamps, ownership, permission. "
227+
+ "If -pa is specified, then preserves permission also because "
228+
+ "ACL is a super-set of permission. Determination of whether raw "
229+
+ "namespace extended attributes are preserved is independent of "
230+
+ "the -p flag.\n"
231+
+ " -f : Overwrite the destination if it already exists.\n"
232+
+ " -d : Skip creation of temporary file(<dst>._COPYING_).\n"
233+
+ " -t <thread count> : Number of threads to be used, "
234+
+ "default is 1.\n"
235+
+ " -q <thread pool queue size> : Thread pool queue size to be "
236+
+ "used, default is 1024.\n";
237+
238+
@Override
239+
protected void processOptions(LinkedList<String> args) throws IOException {
240+
popPreserveOption(args);
241+
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "f", "d");
242+
cf.addOptionWithValue("t");
243+
cf.addOptionWithValue("q");
244+
cf.parse(args);
245+
setDirectWrite(cf.getOpt("d"));
246+
setOverwrite(cf.getOpt("f"));
247+
setThreadCount(cf.getOptValue("t"));
248+
setThreadPoolQueueSize(cf.getOptValue("q"));
249+
// should have a -r option
250+
setRecursive(true);
251+
getRemoteDestination(args);
252+
}
253+
254+
private void popPreserveOption(List<String> args) {
255+
for (Iterator<String> iter = args.iterator(); iter.hasNext(); ) {
256+
String cur = iter.next();
257+
if (cur.equals("--")) {
258+
// stop parsing arguments when you see --
259+
break;
260+
} else if (cur.startsWith("-p")) {
261+
iter.remove();
262+
if (cur.length() == 2) {
263+
setPreserve(true);
264+
} else {
265+
String attributes = cur.substring(2);
266+
for (int index = 0; index < attributes.length(); index++) {
267+
preserve(FileAttribute.getAttribute(attributes.charAt(index)));
268+
}
269+
}
270+
return;
271+
}
272+
}
273+
}
274+
275+
@Override
276+
protected void processPath(PathData src, PathData dst) throws IOException {
277+
if (src.stat.isSymlink()) {
278+
// TODO: remove when FileContext is supported, this needs to either
279+
// copy the symlink or deref the symlink
280+
throw new PathOperationException(src.toString());
281+
} else if (src.stat.isFile()) {
282+
copyFileToTargetWithFastCp(src, dst);
283+
} else if (src.stat.isDirectory() && !isRecursive()) {
284+
throw new PathIsDirectoryException(src.toString());
285+
}
286+
}
287+
}
288+
213289
/**
214290
* Copy local files to a remote filesystem
215291
*/

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
137137
void setQuota(Path f, long namespaceQuota, long storagespaceQuota);
138138
void setQuotaByStorageType(Path f, StorageType type, long quota);
139139
StorageStatistics getStorageStatistics();
140+
public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException;
140141

141142
/*
142143
Not passed through as the inner implementation will miss features

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
234234
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException;
235235
StorageStatistics getStorageStatistics();
236236

237+
public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException;
238+
237239
FutureDataInputStreamBuilder openFile(Path path)
238240
throws IOException, UnsupportedOperationException;
239241

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,12 @@ public HdfsDataOutputStream next(final FileSystem fs, final Path p)
693693
}.resolve(this, absF);
694694
}
695695

696+
@Override
697+
public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException {
698+
FastCopy fastCopy = new FastCopy(getConf(), src, dst, overwrite);
699+
fastCopy.copyFile();
700+
}
701+
696702
/**
697703
* Same as create(), except fails if parent directory doesn't already exist.
698704
*/

0 commit comments

Comments
 (0)