diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index baf6ad9f492f..e3a3bbf64b87 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -315,6 +315,46 @@ public void testConcurrentRemoveOrphanFilesWithInvalidInput() {
() -> sql(
"CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)",
catalogName, tableIdent, -1));
+
+ String tempViewName = "file_list_test";
+ spark.emptyDataFrame().createOrReplaceTempView(tempViewName);
+
+ AssertHelpers.assertThrows(
+ "Should throw an error if file_list_view is missing required columns",
+ IllegalArgumentException.class,
+ "does not exist. Available:",
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
+ catalogName, tableIdent, tempViewName));
+
+ spark
+ .createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.INT(), Encoders.TIMESTAMP()))
+ .toDF("file_path", "last_modified")
+ .createOrReplaceTempView(tempViewName);
+
+ AssertHelpers.assertThrows(
+ "Should throw an error if file_path has wrong type",
+ IllegalArgumentException.class,
+ "Invalid file_path column",
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
+ catalogName, tableIdent, tempViewName));
+
+ spark
+ .createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
+ .toDF("file_path", "last_modified")
+ .createOrReplaceTempView(tempViewName);
+
+ AssertHelpers.assertThrows(
+ "Should throw an error if last_modified has wrong type",
+ IllegalArgumentException.class,
+ "Invalid last_modified column",
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
+ catalogName, tableIdent, tempViewName));
}
@Test
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
index e9d1ee8d605d..9b8011856839 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
@@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
+import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -38,6 +39,7 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
@@ -53,6 +55,8 @@
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +73,12 @@
* removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
* by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
* For example, someone might point this action to the data folder to clean up only orphan data files.
- * In addition, there is a way to configure an alternative delete method via {@link #deleteWith(Consumer)}.
+ *
+ * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
+ *
+ * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument. This
+ * skips the directory listing - any files in the dataset provided which are not found in table metadata will
+ * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
*
* Note: It is dangerous to call this action with a short retention interval as it might corrupt
* the state of the table if another operation is writing at the same time.
@@ -99,6 +108,7 @@ public void accept(String file) {
private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
+ private Dataset compareToFileList;
private Consumer deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;
@@ -144,6 +154,37 @@ public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer newDeleteFun
return this;
}
+ public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset files) {
+ StructType schema = files.schema();
+
+ StructField filePathField = schema.apply(FILE_PATH);
+ Preconditions.checkArgument(
+ filePathField.dataType() == DataTypes.StringType,
+ "Invalid %s column: %s is not a string",
+ FILE_PATH,
+ filePathField.dataType());
+
+ StructField lastModifiedField = schema.apply(LAST_MODIFIED);
+ Preconditions.checkArgument(
+ lastModifiedField.dataType() == DataTypes.TimestampType,
+ "Invalid %s column: %s is not a timestamp",
+ LAST_MODIFIED,
+ lastModifiedField.dataType());
+
+ this.compareToFileList = files;
+ return this;
+ }
+
+ private Dataset filteredCompareToFileList() {
+ Dataset files = compareToFileList;
+ if (location != null) {
+ files = files.filter(files.col(FILE_PATH).startsWith(location));
+ }
+ return files
+ .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
+ .select(files.col(FILE_PATH));
+ }
+
@Override
public DeleteOrphanFiles.Result execute() {
JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc());
@@ -163,7 +204,7 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset validContentFileDF = buildValidContentFileDF(table);
Dataset validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset validFileDF = validContentFileDF.union(validMetadataFileDF);
- Dataset actualFileDF = buildActualFileDF();
+ Dataset actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 4e53733ae469..f8c5e454b0ca 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -62,6 +62,7 @@ abstract class BaseSparkAction implements Action {
protected static final String FILE_PATH = "file_path";
protected static final String FILE_TYPE = "file_type";
+ protected static final String LAST_MODIFIED = "last_modified";
private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 7dbdbc4a44ab..23d64719081c 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.actions.BaseDeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
@@ -49,7 +50,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
- ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType)
+ ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
+ ProcedureParameter.optional("file_list_view", DataTypes.StringType)
};
private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
@@ -80,12 +82,14 @@ public StructType outputType() {
}
@Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
String location = args.isNullAt(2) ? null : args.getString(2);
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
+ String fileListView = args.isNullAt(5) ? null : args.getString(5);
Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
@@ -113,6 +117,10 @@ public InternalRow[] call(InternalRow args) {
action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans"));
}
+ if (fileListView != null) {
+ ((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView));
+ }
+
DeleteOrphanFiles.Result result = action.execute();
return toOutputRows(result);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index a62c7ab53136..39427e66b4dd 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
+import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
@@ -703,4 +705,120 @@ public void testGarbageCollectionDisabled() {
ValidationException.class, "Cannot delete orphan files: GC is disabled",
() -> SparkActions.get().deleteOrphanFiles(table).execute());
}
+
+ @Test
+ public void testCompareToFileList() throws IOException, InterruptedException {
+ Table table =
+ TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+ List records =
+ Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+
+ Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);
+
+ df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
+
+ df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
+
+ Path dataPath = new Path(tableLocation + "/data");
+ FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
+ List validFiles =
+ Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
+ .filter(FileStatus::isFile)
+ .map(
+ file ->
+ new FilePathLastModifiedRecord(
+ file.getPath().toString(), new Timestamp(file.getModificationTime())))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals("Should be 2 valid files", 2, validFiles.size());
+
+ df.write().mode("append").parquet(tableLocation + "/data");
+
+ List allFiles =
+ Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
+ .filter(FileStatus::isFile)
+ .map(
+ file ->
+ new FilePathLastModifiedRecord(
+ file.getPath().toString(), new Timestamp(file.getModificationTime())))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals("Should be 3 files", 3, allFiles.size());
+
+ List invalidFiles = Lists.newArrayList(allFiles);
+ invalidFiles.removeAll(validFiles);
+ List invalidFilePaths =
+ invalidFiles.stream()
+ .map(FilePathLastModifiedRecord::getFilePath)
+ .collect(Collectors.toList());
+ Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+
+ // sleep for 1 second to ensure files will be old enough
+ Thread.sleep(1000);
+
+ SparkActions actions = SparkActions.get();
+
+ Dataset compareToFileList =
+ spark
+ .createDataFrame(allFiles, FilePathLastModifiedRecord.class)
+ .withColumnRenamed("filePath", "file_path")
+ .withColumnRenamed("lastModified", "last_modified");
+
+ DeleteOrphanFiles.Result result1 =
+ ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ .compareToFileList(compareToFileList)
+ .deleteWith(s -> { })
+ .execute();
+ Assert.assertTrue(
+ "Default olderThan interval should be safe",
+ Iterables.isEmpty(result1.orphanFileLocations()));
+
+ DeleteOrphanFiles.Result result2 =
+ ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ .compareToFileList(compareToFileList)
+ .olderThan(System.currentTimeMillis())
+ .deleteWith(s -> { })
+ .execute();
+ Assert.assertEquals(
+ "Action should find 1 file", invalidFilePaths, result2.orphanFileLocations());
+ Assert.assertTrue(
+ "Invalid file should be present", fs.exists(new Path(invalidFilePaths.get(0))));
+
+ DeleteOrphanFiles.Result result3 =
+ ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ .compareToFileList(compareToFileList)
+ .olderThan(System.currentTimeMillis())
+ .execute();
+ Assert.assertEquals(
+ "Action should delete 1 file", invalidFilePaths, result3.orphanFileLocations());
+ Assert.assertFalse(
+ "Invalid file should not be present", fs.exists(new Path(invalidFilePaths.get(0))));
+
+ List expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(records);
+ expectedRecords.addAll(records);
+
+ Dataset resultDF = spark.read().format("iceberg").load(tableLocation);
+ List actualRecords =
+ resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
+ Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+
+ List outsideLocationMockFiles =
+ Lists.newArrayList(new FilePathLastModifiedRecord("/tmp/mock1", new Timestamp(0L)));
+
+ Dataset compareToFileListWithOutsideLocation =
+ spark
+ .createDataFrame(outsideLocationMockFiles, FilePathLastModifiedRecord.class)
+ .withColumnRenamed("filePath", "file_path")
+ .withColumnRenamed("lastModified", "last_modified");
+
+ DeleteOrphanFiles.Result result4 =
+ ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ .compareToFileList(compareToFileListWithOutsideLocation)
+ .deleteWith(s -> { })
+ .execute();
+ Assert.assertEquals(
+ "Action should find nothing", Lists.newArrayList(), result4.orphanFileLocations());
+ }
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/FilePathLastModifiedRecord.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/FilePathLastModifiedRecord.java
new file mode 100644
index 000000000000..275e3a520db5
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/FilePathLastModifiedRecord.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.sql.Timestamp;
+import java.util.Objects;
+
+public class FilePathLastModifiedRecord {
+ private String filePath;
+ private Timestamp lastModified;
+
+ public FilePathLastModifiedRecord() {
+ }
+
+ public FilePathLastModifiedRecord(String filePath, Timestamp lastModified) {
+ this.filePath = filePath;
+ this.lastModified = lastModified;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public Timestamp getLastModified() {
+ return lastModified;
+ }
+
+ public void setLastModified(Timestamp lastModified) {
+ this.lastModified = lastModified;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FilePathLastModifiedRecord that = (FilePathLastModifiedRecord) o;
+ return Objects.equals(filePath, that.filePath) &&
+ Objects.equals(lastModified, that.lastModified);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(filePath, lastModified);
+ }
+
+ @Override
+ public String toString() {
+ return "FilePathLastModifiedRecord{" +
+ "filePath='" + filePath + '\'' +
+ ", lastModified='" + lastModified + '\'' +
+ '}';
+ }
+}