Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ project(':iceberg-hive-metastore') {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-resourcemanager'
}

// By default, hive-exec is a fat/uber jar and it exports a guava library
Expand All @@ -772,6 +773,7 @@ project(':iceberg-hive-metastore') {
exclude group: 'org.apache.calcite'
exclude group: 'org.apache.calcite.avatica'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-resourcemanager'
}

testImplementation(libs.hive2.metastore) {
Expand All @@ -788,6 +790,10 @@ project(':iceberg-hive-metastore') {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-common'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-applicationhistoryservice'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-resourcemanager'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-server-web-proxy'
}

compileOnly(libs.hadoop3.client) {
Expand Down
157 changes: 153 additions & 4 deletions core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,39 @@
*/
package org.apache.iceberg.hadoop;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.wrappedio.WrappedIO;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
Expand Down Expand Up @@ -173,10 +187,36 @@ public void deletePrefix(String prefix) {
}
}

/**
* Delete files.
*
* @param pathsToDelete The paths to delete
* @throws BulkDeletionFailureException failure to delete files.
*/
@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
public void deleteFiles(final Iterable<String> pathsToDelete)
throws BulkDeletionFailureException {
Iterable<String> targetPaths = pathsToDelete;
try {
final List<Map.Entry<Path, String>> pathsNotDeleted = hadoopBulkDelete(targetPaths);
if (pathsNotDeleted.isEmpty()) {
return;
}
// one or more files were not deleted.
targetPaths =
pathsNotDeleted.stream()
.map(
entry -> {
LOG.info("Failed to delete {} cause: {}", entry.getKey(), entry.getValue());
return entry.getKey().toString();
})
.collect(Collectors.toList());
} catch (RuntimeException e) {
LOG.warn("Failed to use bulk delete -falling back to single delete calls", e);
}

AtomicInteger failureCount = new AtomicInteger(0);
Tasks.foreach(pathsToDelete)
Tasks.foreach(targetPaths)
.executeWith(executorService())
.retry(DELETE_RETRY_ATTEMPTS)
.stopRetryOn(FileNotFoundException.class)
Expand All @@ -187,12 +227,108 @@ public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailu
failureCount.incrementAndGet();
})
.run(this::deleteFile);

if (failureCount.get() != 0) {
throw new BulkDeletionFailureException(failureCount.get());
}
}

/**
* Delete files through the Hadoop Bulk Delete API.
*
* @param pathnames paths to delete.
* @return All paths which could not be deleted, and the reason
* @throws UncheckedIOException if an IOE was raised in the invoked methods.
* @throws RuntimeException if interrupted while waiting for deletions to complete.
*/
private List<Map.Entry<Path, String>> hadoopBulkDelete(Iterable<String> pathnames) {

LOG.debug("Using bulk delete operation to delete files");

SetMultimap<Path, Path> fsMap = Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
Map<Path, Integer> fsPageSizeMap = Maps.newHashMap();
List<Map.Entry<Path, String>> filesNotDeleted = Lists.newArrayList();
List<Future<List<Map.Entry<Path, String>>>> deletionTasks = Lists.newArrayList();
final Path rootPath = new Path("/");
final Configuration conf = hadoopConf.get();
for (String name : pathnames) {
Path target = new Path(name);
LOG.debug("Deleting '{}' mapped to path '{}'", name, target);
final FileSystem fs;
try {
fs = Util.getFs(target, conf);
} catch (Exception e) {
LOG.warn("Failed to load filesystem for path: {}", target, e);
filesNotDeleted.add(new AbstractMap.SimpleImmutableEntry<>(target, e.toString()));
continue;
}
Path fsRoot = fs.makeQualified(rootPath);
int pageSize;
if (!fsPageSizeMap.containsKey(fsRoot)) {
pageSize = WrappedIO.bulkDelete_pageSize(fs, rootPath);
fsPageSizeMap.put(fsRoot, pageSize);
} else {
pageSize = fsPageSizeMap.get(fsRoot);
}

Set<Path> pathsForFilesystem = fsMap.get(fsRoot);
Path targetPath = fs.makeQualified(target);
pathsForFilesystem.add(targetPath);

if (pathsForFilesystem.size() == pageSize) {
Collection<Path> paths = Sets.newHashSet(pathsForFilesystem);
deletionTasks.add(executorService().submit(() -> deleteBatch(fs, fsRoot, paths)));
fsMap.removeAll(fsRoot);
}
}

for (Map.Entry<Path, Collection<Path>> pathsToDeleteByFileSystem : fsMap.asMap().entrySet()) {
Path fsRoot = pathsToDeleteByFileSystem.getKey();
deletionTasks.add(
executorService()
.submit(
() ->
deleteBatch(
Util.getFs(fsRoot, conf), fsRoot, pathsToDeleteByFileSystem.getValue())));
}

LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size());

for (Future<List<Map.Entry<Path, String>>> deletionTask : deletionTasks) {
try {
List<Map.Entry<Path, String>> failedDeletions = deletionTask.get();
failedDeletions.forEach(
entry -> {
LOG.debug("Failed to delete object at path {}: {}", entry.getKey(), entry.getValue());
filesNotDeleted.add(entry);
});
} catch (ExecutionException e) {
LOG.warn("Exception during batch deletion", e.getCause());
// this failure
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
deletionTasks.stream().filter(task -> !task.isDone()).forEach(task -> task.cancel(true));
throw new RuntimeException("Interrupted when waiting for deletions to complete", e);
}
}
return filesNotDeleted;
}

/**
* Blocking batch delete.
*
* @param fs filesystem.
* @param fsRoot root of the filesytem
* @param paths paths to delete.
* @return Paths which couldn't be deleted and the error messages
* @throws UncheckedIOException IO problem
*/
private List<Map.Entry<Path, String>> deleteBatch(
FileSystem fs, final Path fsRoot, Collection<Path> paths) {

LOG.debug("Deleting batch of {} files under {}", paths.size(), fsRoot);
return WrappedIO.bulkDelete_delete(fs, fsRoot, paths);
}

private int deleteThreads() {
int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE;
return conf().getInt(DELETE_FILE_PARALLELISM, defaultValue);
Expand All @@ -213,10 +349,11 @@ private ExecutorService executorService() {

/**
* This class is a simple adaptor to allow for using Hadoop's RemoteIterator as an Iterator.
* Forwards {@link #close()} to the delegate if it is Closeable.
*
* @param <E> element type
*/
private static class AdaptingIterator<E> implements Iterator<E>, RemoteIterator<E> {
private static class AdaptingIterator<E> implements Iterator<E>, RemoteIterator<E>, Closeable {
private final RemoteIterator<E> delegate;

AdaptingIterator(RemoteIterator<E> delegate) {
Expand All @@ -240,5 +377,17 @@ public E next() {
throw new UncheckedIOException(e);
}
}

@Override
public void close() throws IOException {
if (delegate instanceof Closeable) {
((Closeable) delegate).close();
}
}

@Override
public String toString() {
return delegate.toString();
}
}
}
Loading