Skip to content

Commit c4a2868

Browse files
committed
HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732).
1 parent 76c40a5 commit c4a2868

File tree

17 files changed

+773
-165
lines changed

17 files changed

+773
-165
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.util.functional;
20+
21+
import javax.annotation.Nullable;
22+
import java.io.IOException;
23+
import java.io.UncheckedIOException;
24+
import java.util.List;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.CancellationException;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.CompletionException;
29+
import java.util.concurrent.Executor;
30+
import java.util.function.Supplier;
31+
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import org.apache.hadoop.util.DurationInfo;
36+
37+
import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
38+
39+
/**
40+
* A bridge from Callable to Supplier; catching exceptions
41+
* raised by the callable and wrapping them as appropriate.
42+
* @param <T> return type.
43+
*/
44+
public final class CommonCallableSupplier<T> implements Supplier {
45+
46+
private static final Logger LOG =
47+
LoggerFactory.getLogger(CommonCallableSupplier.class);
48+
49+
private final Callable<T> call;
50+
51+
/**
52+
* Create.
53+
* @param call call to invoke.
54+
*/
55+
public CommonCallableSupplier(final Callable<T> call) {
56+
this.call = call;
57+
}
58+
59+
@Override
60+
public Object get() {
61+
try {
62+
return call.call();
63+
} catch (RuntimeException e) {
64+
throw e;
65+
} catch (IOException e) {
66+
throw new UncheckedIOException(e);
67+
} catch (Exception e) {
68+
throw new UncheckedIOException(new IOException(e));
69+
}
70+
}
71+
72+
/**
73+
* Submit a callable into a completable future.
74+
* RTEs are rethrown.
75+
* Non RTEs are caught and wrapped; IOExceptions to
76+
* {@code RuntimeIOException} instances.
77+
* @param executor executor.
78+
* @param call call to invoke
79+
* @param <T> type
80+
* @return the future to wait for
81+
*/
82+
@SuppressWarnings("unchecked")
83+
public static <T> CompletableFuture<T> submit(final Executor executor,
84+
final Callable<T> call) {
85+
return CompletableFuture
86+
.supplyAsync(new CommonCallableSupplier<T>(call), executor);
87+
}
88+
89+
/**
90+
* Wait for a list of futures to complete. If the list is empty,
91+
* return immediately.
92+
* @param futures list of futures.
93+
* @throws IOException if one of the called futures raised an IOE.
94+
* @throws RuntimeException if one of the futures raised one.
95+
*/
96+
public static <T> void waitForCompletion(
97+
final List<CompletableFuture<T>> futures) throws IOException {
98+
if (futures.isEmpty()) {
99+
return;
100+
}
101+
// await completion
102+
waitForCompletion(
103+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
104+
}
105+
106+
/**
107+
* Wait for a single of future to complete, extracting IOEs afterwards.
108+
* @param future future to wait for.
109+
* @throws IOException if one of the called futures raised an IOE.
110+
* @throws RuntimeException if one of the futures raised one.
111+
*/
112+
public static <T> void waitForCompletion(final CompletableFuture<T> future)
113+
throws IOException {
114+
try (DurationInfo ignore = new DurationInfo(LOG, false,
115+
"Waiting for task completion")) {
116+
future.join();
117+
} catch (CancellationException e) {
118+
throw new IOException(e);
119+
} catch (CompletionException e) {
120+
raiseInnerCause(e);
121+
}
122+
}
123+
124+
/**
125+
* Wait for a single of future to complete, ignoring exceptions raised.
126+
* @param future future to wait for.
127+
*/
128+
public static <T> void waitForCompletionIgnoringExceptions(
129+
@Nullable final CompletableFuture<T> future) {
130+
if (future != null) {
131+
try (DurationInfo ignore = new DurationInfo(LOG, false,
132+
"Waiting for task completion")) {
133+
future.join();
134+
} catch (Exception e) {
135+
LOG.debug("Ignoring exception raised in task completion: ");
136+
}
137+
}
138+
}
139+
140+
/**
141+
* Block awaiting completion for any non-null future passed in;
142+
* No-op if a null arg was supplied.
143+
* @param future future
144+
* @throws IOException if one of the called futures raised an IOE.
145+
* @throws RuntimeException if one of the futures raised one.
146+
*/
147+
public static void maybeAwaitCompletion(
148+
@Nullable final CompletableFuture<Void> future) throws IOException {
149+
if (future != null) {
150+
waitForCompletion(future);
151+
}
152+
}
153+
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,17 @@
3030
import java.lang.management.ThreadInfo;
3131
import java.lang.management.ThreadMXBean;
3232
import java.lang.reflect.InvocationTargetException;
33+
import java.util.ArrayList;
3334
import java.util.Arrays;
35+
import java.util.List;
3436
import java.util.Locale;
3537
import java.util.Objects;
3638
import java.util.Random;
3739
import java.util.Set;
3840
import java.util.Enumeration;
41+
import java.util.concurrent.CompletableFuture;
3942
import java.util.concurrent.CountDownLatch;
43+
import java.util.concurrent.TimeUnit;
4044
import java.util.concurrent.TimeoutException;
4145
import java.util.concurrent.atomic.AtomicInteger;
4246
import java.util.function.Supplier;
@@ -46,8 +50,11 @@
4650
import org.apache.commons.lang3.RandomStringUtils;
4751
import org.apache.commons.logging.Log;
4852
import org.apache.commons.logging.impl.Log4JLogger;
53+
import org.apache.hadoop.fs.FileSystem;
4954
import org.apache.hadoop.fs.FileUtil;
5055
import org.apache.hadoop.fs.Path;
56+
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
57+
import org.apache.hadoop.util.DurationInfo;
5158
import org.apache.hadoop.util.StringUtils;
5259
import org.apache.hadoop.util.Time;
5360
import org.apache.log4j.Appender;
@@ -61,15 +68,28 @@
6168
import org.junit.Assume;
6269
import org.mockito.invocation.InvocationOnMock;
6370
import org.mockito.stubbing.Answer;
71+
import org.slf4j.LoggerFactory;
6472

73+
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
6574
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
6675
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
6776

77+
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
78+
import static org.apache.hadoop.util.functional.CommonCallableSupplier.submit;
79+
import static org.apache.hadoop.util.functional.CommonCallableSupplier.waitForCompletion;
80+
6881
/**
6982
* Test provides some very generic helpers which might be used across the tests
7083
*/
7184
public abstract class GenericTestUtils {
7285

86+
public static final int EXECUTOR_THREAD_COUNT = 64;
87+
88+
private static final org.slf4j.Logger LOG =
89+
LoggerFactory.getLogger(GenericTestUtils.class);
90+
91+
public static final String PREFIX = "file-";
92+
7393
private static final AtomicInteger sequence = new AtomicInteger();
7494

7595
/**
@@ -896,5 +916,132 @@ public static int getTestsThreadCount() {
896916
}
897917
return threadCount;
898918
}
919+
/**
920+
* Write the text to a file asynchronously. Logs the operation duration.
921+
* @param fs filesystem
922+
* @param path path
923+
* @return future to the patch created.
924+
*/
925+
private static CompletableFuture<Path> put(FileSystem fs,
926+
Path path, String text) {
927+
return submit(EXECUTOR, () -> {
928+
try (DurationInfo ignore =
929+
new DurationInfo(LOG, false, "Creating %s", path)) {
930+
createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
931+
return path;
932+
}
933+
});
934+
}
935+
936+
/**
937+
* Build a set of files in a directory tree.
938+
* @param fs filesystem
939+
* @param destDir destination
940+
* @param depth file depth
941+
* @param fileCount number of files to create.
942+
* @param dirCount number of dirs to create at each level
943+
* @return the list of files created.
944+
*/
945+
public static List<Path> createFiles(final FileSystem fs,
946+
final Path destDir,
947+
final int depth,
948+
final int fileCount,
949+
final int dirCount) throws IOException {
950+
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
951+
new ArrayList<Path>(fileCount),
952+
new ArrayList<Path>(dirCount));
953+
}
954+
955+
/**
956+
* Build a set of files in a directory tree.
957+
* @param fs filesystem
958+
* @param destDir destination
959+
* @param depth file depth
960+
* @param fileCount number of files to create.
961+
* @param dirCount number of dirs to create at each level
962+
* @param paths [out] list of file paths created
963+
* @param dirs [out] list of directory paths created.
964+
* @return the list of files created.
965+
*/
966+
public static List<Path> createDirsAndFiles(final FileSystem fs,
967+
final Path destDir,
968+
final int depth,
969+
final int fileCount,
970+
final int dirCount,
971+
final List<Path> paths,
972+
final List<Path> dirs) throws IOException {
973+
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
974+
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
975+
+ dirs.size());
976+
977+
// create directories. With dir marker retention, that adds more entries
978+
// to cause deletion issues
979+
try (DurationInfo ignore =
980+
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
981+
for (Path path : dirs) {
982+
futures.add(submit(EXECUTOR, () ->{
983+
fs.mkdirs(path);
984+
return path;
985+
}));
986+
}
987+
waitForCompletion(futures);
988+
}
989+
990+
try (DurationInfo ignore =
991+
new DurationInfo(LOG, "Creating %d files", paths.size())) {
992+
for (Path path : paths) {
993+
futures.add(put(fs, path, path.getName()));
994+
}
995+
waitForCompletion(futures);
996+
return paths;
997+
}
998+
}
899999

900-
}
1000+
/**
1001+
* Recursive method to build up lists of files and directories.
1002+
* @param filePaths list of file paths to add entries to.
1003+
* @param dirPaths list of directory paths to add entries to.
1004+
* @param destDir destination directory.
1005+
* @param depth depth of directories
1006+
* @param fileCount number of files.
1007+
* @param dirCount number of directories.
1008+
*/
1009+
public static void buildPaths(final List<Path> filePaths,
1010+
final List<Path> dirPaths, final Path destDir, final int depth,
1011+
final int fileCount, final int dirCount) {
1012+
if (depth <= 0) {
1013+
return;
1014+
}
1015+
// create the file paths
1016+
for (int i = 0; i < fileCount; i++) {
1017+
String name = filenameOfIndex(i);
1018+
Path p = new Path(destDir, name);
1019+
filePaths.add(p);
1020+
}
1021+
for (int i = 0; i < dirCount; i++) {
1022+
String name = String.format("dir-%03d", i);
1023+
Path p = new Path(destDir, name);
1024+
dirPaths.add(p);
1025+
buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
1026+
}
1027+
}
1028+
1029+
/**
1030+
* Given an index, return a string to use as the filename.
1031+
* @param i index
1032+
* @return name
1033+
*/
1034+
public static String filenameOfIndex(final int i) {
1035+
return String.format("%s%03d", PREFIX, i);
1036+
}
1037+
1038+
/**
1039+
* For submitting work.
1040+
*/
1041+
private static final BlockingThreadPoolExecutorService EXECUTOR =
1042+
BlockingThreadPoolExecutorService.newInstance(
1043+
EXECUTOR_THREAD_COUNT,
1044+
EXECUTOR_THREAD_COUNT * 2,
1045+
30, TimeUnit.SECONDS,
1046+
"test-operations");
1047+
}

0 commit comments

Comments
 (0)