Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.2: Add compareWithFileList param to DeleteOrphanFiles action #4503

Merged

Conversation

bijanhoule
Copy link
Contributor

@bijanhoule bijanhoule commented Apr 5, 2022

This allows the user to specify an existing table of actual files for the DeleteOrphanFiles action, skipping the directory listing.

This can be used in cases where a cloud storage provider-specific implementation might be more efficient than a recursive FileSystem listing (e.g. leveraging S3 Inventory list for S3 to create a temp table).

…rocedure

This allows the user to specify an existing table of actual files for the DeleteOrphanFiles action, skipping the directory listing.
/**
* Passes a table which contains the list of actual files in the table. This skips the directory listing - any
* files in the actualFilesTable provided which are not found in table metadata will be deleted. Not compatible
* with `location` or `older_than` arguments - this assumes that the provided table of actual files has been
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the table had a timestamp? For example, if the table has a modified_time then we could apply the older_than filter fairly easily rather than making the user handle it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding modified_time sounds like a good idea as ignoring older_than is dangerous. Files just written but not yet committed could be cleaned up, corrupting the table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would even consider making it required. How many people will filter the locations manually before calling the action?

* @param tableName the table containing the actual files dataset. Should have a single `file_path` string column
* @return this for method chaining
*/
default DeleteOrphanFiles actualFilesTable(String tableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a good idea to be able to use a temporary table for the stored procedure, but for the action API I would expect to be able to pass in a Dataset of some kind. Dataset<String> would work, and we could also expose a couple of Java classes to make Dataset<FileLocationAndModifiedTime> and Dataset<FileLocation> work.

I think that we can also make what's happening here a bit more clear in the API by naming the method something like compareToFileList(Dataset<String>):

Dataset<String> files = ...
SparkActions.get()
    .deleteOrphanFiles(catalog.loadTable(identifier))
    .compareToFileList(files)
    .execute();

Copy link
Contributor

@aokolnychyi aokolnychyi Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the problem is that the Action API is engine agnostic so we cannot refer to Dataset. We considered offering engine-specific actions at some point but that idea was discarded as it would overcomplicate everything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this into the Spark-specific API then.

Copy link
Contributor

@aokolnychyi aokolnychyi Apr 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, you mean exposing this method in BaseDeleteOrphanFilesSparkAction similarly to expire in BaseExpireSnapshotsSparkAction or introduce a Spark-specific interface?

It is a bummer as this functionality can be useful for all engines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may be able to do this later, but I think the most natural way to do this for the Actions API is to use a Dataset. We can take another look at this later, too. Maybe we should parameterize this method so that you can specify what collection you want. Flink could use DataStream and Spark could use Dataset.

@@ -49,7 +49,8 @@
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType)
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
ProcedureParameter.optional("actual_files_table", DataTypes.StringType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about renaming this to file_list_view or listed_files_view? I think that we want some identifier that matches the name of the action method; that's why I'm thinking file_list or listed_files that are similar to compareToFileList. But we also want to make it clear that this is a table or temporary view. I think it's more likely that the DF is registered as a temp view rather than stored in a table, so I'd probably use the suffix _view.

@@ -150,10 +150,20 @@ public void testDryRun() throws IOException, InterruptedException {
Assert.assertEquals("Action should find 1 file", invalidFiles, result2.orphanFileLocations());
Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0))));

String actualFilesTableName = "actualFilesTable";
spark.createDataset(allFiles, Encoders.STRING()).toDF("file_path").createOrReplaceTempView(actualFilesTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this test case should run after the actual orphan file delete. Otherwise, listing produces the same result and we're not actually verifying that we get a different result from the new option.

@rdblue
Copy link
Contributor

rdblue commented Apr 5, 2022

Thanks, @bijanhoule! This looks close to being ready, but I think we should make a few minor modifications to make the actions API a bit more intuitive. I would expect that to accept a Dataset rather than requiring temp table registration.

@aokolnychyi
Copy link
Contributor

I'd be curious to take a look as well tomorrow.

Assert.assertEquals("Action should find 1 file", invalidFiles, result3.orphanFileLocations());
Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0))));

DeleteOrphanFiles.Result result4 = actions.deleteOrphanFiles(table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add support for things like older_than, we should probably make a separate test case for the actual files table option so we can exercise various scenarios focused on that one area.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We usually want to add separate test cases anyway.

Comment on lines 97 to 98
tableWithActualFilePaths == null || (location == null && olderThanMillis == null),
"actual_files_table cannot be used with `location` or `older_than`"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this is over-indented. Code continued on the next line should be indented 4 spaces over from its starting point.

That said, it does line up with the argument on the line above it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think it uses 8 instead of 4. The line above is a mistake too.

@@ -163,7 +194,7 @@ private String jobDesc() {
Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();
Dataset<Row> actualFileDF = this.providedActualFilesDF == null ? buildActualFileDF() : providedActualFilesDF;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant this

@bijanhoule
Copy link
Contributor Author

Thanks for the feedback all! I'll take another pass and adjust a few things:

  • move to the spark-specific api and use a dataset input rather than temp table
  • add support for modified_time and enforce older_than. Originally the thought was to keep this out for simplicity, but I agree it's probably safer to just make it a requirement.
  • separate out test cases and add coverage for the above

- action: rename actualFilesTable -> compareWithFileList
- procedure: rename actual_files_table -> file_list_view
- interface change:  remove from DeleteOrphanFiles, add to BaseDeleteOrphanFilesSparkAction
- apply location and last_modified filtering to compareWithFileList
- refactor test cases
Comment on lines +180 to +182
if (location != null) {
files = files.filter(files.col(FILE_PATH).startsWith(location));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi -- re: #4503 (comment) does this startsWith location filtering seem reasonable to you for a short-term solution? I know there's some discussion about making some larger changes here.

@bijanhoule
Copy link
Contributor Author

FYI, updated this PR based on feedback above:

  • action: rename actualFilesTable -> compareWithFileList
  • procedure: rename actual_files_table -> file_list_view
  • interface change: remove from DeleteOrphanFiles, add to BaseDeleteOrphanFilesSparkAction
  • apply location and last_modified filtering to compareWithFileList
  • refactor test cases

@bijanhoule bijanhoule changed the title Spark 3.2: Add actualFilesTable param to DeleteOrphanFiles action Spark 3.2: Add compareWithFileList param to DeleteOrphanFiles action Apr 21, 2022
Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());

// sleep for 1 second to ensure files will be old enough
Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use waitUntilAfter rather than sleeping.

@rdblue rdblue merged commit 17c0ff7 into apache:master Apr 24, 2022
@rdblue
Copy link
Contributor

rdblue commented Apr 24, 2022

This looks great. Thanks for adding this, @bijanhoule!

SinghAsDev pushed a commit to SinghAsDev/iceberg that referenced this pull request Jun 23, 2022
SinghAsDev added a commit to pinterest/iceberg that referenced this pull request Jul 25, 2022
…11)

* Spark 3.2: Add compareWithFileList to DeleteOrphanFiles action (apache#4503)

* Add regex replace support in remove orphan file action and procedure to allow for handling alternative filesystem schemes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants