Skip to content

Commit

Permalink
Spark 3.2: Add compareWithFileList to DeleteOrphanFiles action (#4503)
Browse files Browse the repository at this point in the history
  • Loading branch information
bijanhoule authored Apr 24, 2022
1 parent 449a743 commit 17c0ff7
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,46 @@ public void testConcurrentRemoveOrphanFilesWithInvalidInput() {
() -> sql(
"CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)",
catalogName, tableIdent, -1));

String tempViewName = "file_list_test";
spark.emptyDataFrame().createOrReplaceTempView(tempViewName);

AssertHelpers.assertThrows(
"Should throw an error if file_list_view is missing required columns",
IllegalArgumentException.class,
"does not exist. Available:",
() ->
sql(
"CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
catalogName, tableIdent, tempViewName));

spark
.createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.INT(), Encoders.TIMESTAMP()))
.toDF("file_path", "last_modified")
.createOrReplaceTempView(tempViewName);

AssertHelpers.assertThrows(
"Should throw an error if file_path has wrong type",
IllegalArgumentException.class,
"Invalid file_path column",
() ->
sql(
"CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
catalogName, tableIdent, tempViewName));

spark
.createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.toDF("file_path", "last_modified")
.createOrReplaceTempView(tempViewName);

AssertHelpers.assertThrows(
"Should throw an error if last_modified has wrong type",
IllegalArgumentException.class,
"Invalid last_modified column",
() ->
sql(
"CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
catalogName, tableIdent, tempViewName));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
Expand All @@ -53,6 +55,8 @@
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,7 +73,12 @@
* removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
* by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
* For example, someone might point this action to the data folder to clean up only orphan data files.
* In addition, there is a way to configure an alternative delete method via {@link #deleteWith(Consumer)}.
* <p>
* Configure an alternative delete method using {@link #deleteWith(Consumer)}.
* <p>
* For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument. This
* skips the directory listing - any files in the dataset provided which are not found in table metadata will
* be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
* <p>
* <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
* the state of the table if another operation is writing at the same time.
Expand Down Expand Up @@ -99,6 +108,7 @@ public void accept(String file) {

private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
private Dataset<Row> compareToFileList;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;

Expand Down Expand Up @@ -144,6 +154,37 @@ public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFun
return this;
}

public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
StructType schema = files.schema();

StructField filePathField = schema.apply(FILE_PATH);
Preconditions.checkArgument(
filePathField.dataType() == DataTypes.StringType,
"Invalid %s column: %s is not a string",
FILE_PATH,
filePathField.dataType());

StructField lastModifiedField = schema.apply(LAST_MODIFIED);
Preconditions.checkArgument(
lastModifiedField.dataType() == DataTypes.TimestampType,
"Invalid %s column: %s is not a timestamp",
LAST_MODIFIED,
lastModifiedField.dataType());

this.compareToFileList = files;
return this;
}

private Dataset<Row> filteredCompareToFileList() {
Dataset<Row> files = compareToFileList;
if (location != null) {
files = files.filter(files.col(FILE_PATH).startsWith(location));
}
return files
.filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
.select(files.col(FILE_PATH));
}

@Override
public DeleteOrphanFiles.Result execute() {
JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc());
Expand All @@ -163,7 +204,7 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();
Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();

Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {

protected static final String FILE_PATH = "file_path";
protected static final String FILE_TYPE = "file_type";
protected static final String LAST_MODIFIED = "last_modified";

private static final AtomicInteger JOB_COUNTER = new AtomicInteger();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.actions.BaseDeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
Expand All @@ -49,7 +50,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType)
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
ProcedureParameter.optional("file_list_view", DataTypes.StringType)
};

private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
Expand Down Expand Up @@ -80,12 +82,14 @@ public StructType outputType() {
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
String location = args.isNullAt(2) ? null : args.getString(2);
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
String fileListView = args.isNullAt(5) ? null : args.getString(5);

Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
Expand Down Expand Up @@ -113,6 +117,10 @@ public InternalRow[] call(InternalRow args) {
action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans"));
}

if (fileListView != null) {
((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView));
}

DeleteOrphanFiles.Result result = action.execute();

return toOutputRows(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -703,4 +705,120 @@ public void testGarbageCollectionDisabled() {
ValidationException.class, "Cannot delete orphan files: GC is disabled",
() -> SparkActions.get().deleteOrphanFiles(table).execute());
}

@Test
public void testCompareToFileList() throws IOException, InterruptedException {
Table table =
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);

List<ThreeColumnRecord> records =
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));

Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);

df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);

df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);

Path dataPath = new Path(tableLocation + "/data");
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
List<FilePathLastModifiedRecord> validFiles =
Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
.filter(FileStatus::isFile)
.map(
file ->
new FilePathLastModifiedRecord(
file.getPath().toString(), new Timestamp(file.getModificationTime())))
.collect(Collectors.toList());

Assert.assertEquals("Should be 2 valid files", 2, validFiles.size());

df.write().mode("append").parquet(tableLocation + "/data");

List<FilePathLastModifiedRecord> allFiles =
Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
.filter(FileStatus::isFile)
.map(
file ->
new FilePathLastModifiedRecord(
file.getPath().toString(), new Timestamp(file.getModificationTime())))
.collect(Collectors.toList());

Assert.assertEquals("Should be 3 files", 3, allFiles.size());

List<FilePathLastModifiedRecord> invalidFiles = Lists.newArrayList(allFiles);
invalidFiles.removeAll(validFiles);
List<String> invalidFilePaths =
invalidFiles.stream()
.map(FilePathLastModifiedRecord::getFilePath)
.collect(Collectors.toList());
Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());

// sleep for 1 second to ensure files will be old enough
Thread.sleep(1000);

SparkActions actions = SparkActions.get();

Dataset<Row> compareToFileList =
spark
.createDataFrame(allFiles, FilePathLastModifiedRecord.class)
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result1 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileList)
.deleteWith(s -> { })
.execute();
Assert.assertTrue(
"Default olderThan interval should be safe",
Iterables.isEmpty(result1.orphanFileLocations()));

DeleteOrphanFiles.Result result2 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> { })
.execute();
Assert.assertEquals(
"Action should find 1 file", invalidFilePaths, result2.orphanFileLocations());
Assert.assertTrue(
"Invalid file should be present", fs.exists(new Path(invalidFilePaths.get(0))));

DeleteOrphanFiles.Result result3 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.execute();
Assert.assertEquals(
"Action should delete 1 file", invalidFilePaths, result3.orphanFileLocations());
Assert.assertFalse(
"Invalid file should not be present", fs.exists(new Path(invalidFilePaths.get(0))));

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
expectedRecords.addAll(records);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);

List<FilePathLastModifiedRecord> outsideLocationMockFiles =
Lists.newArrayList(new FilePathLastModifiedRecord("/tmp/mock1", new Timestamp(0L)));

Dataset<Row> compareToFileListWithOutsideLocation =
spark
.createDataFrame(outsideLocationMockFiles, FilePathLastModifiedRecord.class)
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result4 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> { })
.execute();
Assert.assertEquals(
"Action should find nothing", Lists.newArrayList(), result4.orphanFileLocations());
}
}
Loading

0 comments on commit 17c0ff7

Please sign in to comment.