Skip to content

Commit

Permalink
Use executor service for cleanup of files
Browse files Browse the repository at this point in the history
  • Loading branch information
mnpoonia committed Sep 17, 2024
1 parent 2f405e9 commit b33878a
Showing 1 changed file with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -439,17 +443,11 @@ private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,

List<File> failures = new ArrayList<>();
String startTime = Long.toString(start);
List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
// if its a file archive it
try {
LOG.trace("Archiving {}", file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
failures.add(file);
}
} else {
if (!file.isFile()) {
// otherwise its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Expand All @@ -458,12 +456,52 @@ private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
} else {
filesOnly.add(file);
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
}
ExecutorService executorService = Executors.newFixedThreadPool(25);
Map<File, Future<Boolean>> futures = new HashMap<>();
// In current baseDir all files will be process concurrently
for(File file : filesOnly) {
LOG.trace("Archiving {}", file);
Future<Boolean> archiveTask =
executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futures.put(file, archiveTask);
}

executorService.shutdown();
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdown();
}
} catch (InterruptedException e) {
LOG.warn("HFileArchive Cleanup thread was interrupted while shutting down");
}

for (Map.Entry<File, Future<Boolean>> fileFutureEntry : futures.entrySet()) {
try {
boolean fileCleaned = fileFutureEntry.getValue().get();
if(!fileCleaned) {
LOG.warn(
"Couldn't archive %s into backup directory: %s".formatted(fileFutureEntry.getKey(),
baseArchiveDir));
failures.add(fileFutureEntry.getKey());
}
} catch (InterruptedException e) {
LOG.warn("HFileArchive Cleanup thread was interrupted");
} catch (ExecutionException e) {
// this is IOException
LOG.warn("Failed to archive {}",fileFutureEntry.getKey() , e);
failures.add(fileFutureEntry.getKey());
}

}

return failures;
}

Expand Down

0 comments on commit b33878a

Please sign in to comment.