Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make DeleteOrphanFiles in Spark reliable #4346

Closed
aokolnychyi opened this issue Mar 16, 2022 · 21 comments
Closed

Make DeleteOrphanFiles in Spark reliable #4346

aokolnychyi opened this issue Mar 16, 2022 · 21 comments

Comments

@aokolnychyi
Copy link
Contributor

There have been multiple attempts to make our DeleteOrphanFiles action more reliable. One such discussion happened more than a year ago. However, we never reached consensus.

I will try to summarize my current thoughts but I encourage everyone to comment as well.

Location Generation

There are three location types in Iceberg.

Table location

Table locations are either provided by the user or defaulted in TableOperations. When defaulting, we currently manipulate raw strings via methods such as String$format. That means there is no normalization/validation for root table locations.

Metadata

Classes that extend BaseMetastoreTableOperations use metadataFileLocation to generate a new location for all types of metadata files. Under the hood, it simply uses String$format and has no location normalization.

private String metadataFileLocation(TableMetadata metadata, String filename) {
  String metadataLocation = metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);

  if (metadataLocation != null) {
    return String.format("%s/%s", metadataLocation, filename);
  } else {
    return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
  }
}

In HadoopTableOperations, we rely on Path instead of String$format as we have access to Hadoop classes.

private Path metadataPath(String filename) {
  return new Path(metadataRoot(), filename);
}

private Path metadataRoot() {
  return new Path(location, "metadata");
}

That means some normalization is happening for metadata file locations generated in HadoopTableOperations.

Data

Data file locations depend on LocationProvider returned by TableOperations. While users can inject a custom location provider, Iceberg has two built-in implementations:

  • DefaultLocationProvider
  • ObjectStoreLocationProvider

Both built-in implementations use String$format and have no normalization/validation.

Problem

Right now, DeleteOrphanFiles uses Hadoop FileSystem to list all actual files in a given location and compares them to the locations stored in the metadata. As discussed above, Iceberg does not do any normalization for locations persisted in the metadata. That means locations retuned during listing may have cosmetic differences compared to locations stored in the metadata, even though both can point to the same files. As a consequence, DeleteOrphanFiles can corrupt a table.

Proposed Approach

  • We cannot change what is already stored in the metadata so DeleteOrphanFiles should normalize locations of reachable files. Since we do listing via Hadoop FileSystem, we should probably leverage Hadoop classes for normalization to avoid surprises. For example, just constructing a new Path from a String normalizes the path part of the URI.
Path path = new Path("hdfs://localhost:8020/user//log/data///dummy_file/");
path.toString() // hdfs://localhost:8020/user/log/data/dummy_file
  • Normalization is required but does not solve all issues. Since table locations are arbitrary, we may hit a few weird cases.

    • Data or metadata locations without a scheme and authority.
    • Changes in the Hadoop conf. We may have one set of configured file systems when the table was created and a completely different one when deleting orphans. For example, the scheme name can change (it is just a string), the authority can be represented via an IP address instead of a host name or multiple host names can be mapped into the same name node.
    • I am not sure whether it is possible but can someone migrate from s3a to s3 or vice versa?
  • The action should expose options to ignore the scheme and authority during the comparison. If that happens, only normalized paths will be compared.

  • The location we are about to clean must be validated. If the action is configured to take scheme and authority into account, the provided location must have those set. In other words, it is illegal to provide a location without an authority if the action is supposed to compare authorities.

  • Locations persisted in the metadata without a scheme and authority must inherit those values from the location we scan for orphans, not from the current Hadoop conf. This essentially means we will only compare the normalized path for such locations.

When it comes to possible implementations, we can call mapPartitions on DataFrame with locations.

Path path = new Path(location); // should normalize the path
URI uri = path.toUri(); // should give us access to scheme, authority, path
... // run validation, inherit scheme and authority or ignore them if not needed
Path newPath = new Path(newScheme, newAuthority, uri.getPath())
return newPath.toString();

I know @karuppayya has been working on a fix so I wanted to make sure we build consensus first.

cc @karuppayya @RussellSpitzer @rdblue @flyrain @szehon-ho @jackye1995 @pvary @openinx @rymurr

@aokolnychyi
Copy link
Contributor Author

A few recent issues reported by the community: #4194, #4161. I know there were more issues and a few PRs too. Feel free to link anything that may be related.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Mar 16, 2022

One more point I forgot: I don't think we will be able to support symlinks. As far as I know, symlinks were never finished in HDFS and S3 does not support that either. Only local file system may hit this issue and I am not sure we should care about that.

@rdblue
Copy link
Contributor

rdblue commented Mar 16, 2022

I think about this problem slightly differently. Rather than thinking about it as lacking normalization, the underlying file store may have multiple ways to refer to the same file. That's a bit more broad and covers the cases like schemes that mean the same thing (s3 and s3a) and authority issues.

I think it's reasonable to have some way to normalize both sets of paths before comparing them. It is a little concerning to me that the proposal for doing that is to use Hadoop's path normalization and then optionally ignore certain parts. Delegating that to Hadoop doesn't seem like a good idea to me, but it is at least a start.

I think we should also have strategies for this that depend on the file system scheme. s3 and s3a are essentially the same thing, but you can't ignore authority (bucket) for S3. HDFS may have different namenodes in the authority and whether they're equivalent depends on the Hadoop Configuration. I'd like to get more specific about the file systems that are supported and how each one will normalize and compare.

@aokolnychyi
Copy link
Contributor Author

I think it's reasonable to have some way to normalize both sets of paths before comparing them. It is a little concerning to me that the proposal for doing that is to use Hadoop's path normalization and then optionally ignore certain parts. Delegating that to Hadoop doesn't seem like a good idea to me, but it is at least a start.

This is mainly because we use Hadoop FileSystem for listing. If we use proprietary logic to normalize paths, this may lead to handling edge cases differently. Normalization mostly solves cosmetic issues in the path part of URIs, it does not solve the scheme and authority mismatch.

I am happy to consider alternatives. In the future, we can customize DeleteOrphanFiles so that listing via FileSystem just becomes one way to build a list of actual files. For now, using Hadoop for normalization when we are doing listing via Hadoop seems reasonable to me.

I think we should also have strategies for this that depend on the file system scheme. s3 and s3a are essentially the same thing, but you can't ignore authority (bucket) for S3. HDFS may have different namenodes in the authority and whether they're equivalent depends on the Hadoop Configuration. I'd like to get more specific about the file systems that are supported and how each one will normalize and compare.

This is when it gets tricky. Hadoop conf in jobs that write to the table can be different from Hadoop conf in jobs that delete orphan files. Users can define arbitrary schemes or use different yet equivalent authorities.

Maybe, the default values for the ignore options can depend on the scheme of the location we clean. For example, if the location we scan for orphan files starts with s3, we can ignore the scheme but have to compare the authority and normalized path. We can discuss default values more but what about having ignore-scheme and ignore-authority in general? Do we consider that useful?

@anuragmantri
Copy link
Contributor

Thanks for reviving this issue @aokolnychyi. I've been thinking if having relative paths will help in this case. In my relative paths change, I have separated out table location into a prefix and a location. For example

{
  "format-version" : 2,
  "table-uuid" : "24f86bb2-3473-4352-b8fb-375a55b0267b",
  "location" : "tbl",
  "location-prefix" : "file:/var/folders/wr/q_40znsn3_b0n0hx08v15dzr0000gn/T/hive5023759931126715387/hivedb.db/",
  "last-sequence-number" : 2,
  "last-updated-ms" : 1647492090039,
  "last-column-id" : 1,
  "current-schema-id" : 0,
...
}

Similarly, location is also split into prefix and location in the catalog with the ability to switch prefix with an API.

Would this change help in the problem described here if we always pass a relative path in location => '...' in the remove_orphan_files() call and let the scheme and authority come from location-prefix property above?

In the meantime, having ignore-* makes sense to me. I also agree with @rdblue that we need to find consensus on which defaults apply to which schemes.

@anuragmantri
Copy link
Contributor

Thinking about the relative paths more, I can see some issues

  • It will not work for existing tables that do not use relative paths.
  • Relative paths itself is designed to be optional.

So this will not work in all cases.

@flyrain
Copy link
Contributor

flyrain commented Mar 17, 2022

Not sure if we've already used crc file. I'd think it can increase the comparison reliability. Here is the reason:

  1. We can NOT tell two files are identical if their crc values are the same due to the high collision rate of crc. But we can tell two files are different if their crc are different, which is still useful. For example, we've known the crc values of valid file a, b, c under a dir. We then have the confidence to delete any files without crc or without a valid crc value.
  2. Each file generated by Spark has its crc file, so we don't have to calculate them on the fly. We can even preload them in manifest file if needed.

@szehon-ho
Copy link
Collaborator

szehon-ho commented Mar 22, 2022

A few recent issues reported by the community: #4194, #4161. I know there were more issues and a few PRs too. Feel free to link anything that may be related.

Found another PR on this: #2890

Need to think a bit about CRC.

On first thought, seems the original ignoreScheme/ignoreAuthority proposal can solve the problem. But if the goal is to make RemoveOrphan safe (ex, not remove all files if we change HDFS authority like today), then both need to default true, and as Ryan says we need to decide the right values for all FileSystem we support as it may not be the case for S3. And personal thought, the flag name may cause some confusion (it could be read as remove orphans ignores prefix checking files when removing)

Maybe an alternative is to do it the other way, the user has to force the delete when scheme/authority do not match to get the old behavior (which should be rare), or choose to skip them.

We could do it by distinguishing two sets from the inner Spark job, and have it return

  1. If FileSystem file does not match either absolute or relative path of any reachable Iceberg file
  2. If FileSystem file matches relative path, but not absolute path of a reachable Iceberg file (Before, will be returned as orphans as well and be silently deleted).

Then a flag "prefixMismatchMode", "error", "delete", "skip" controls what to do with the second set (default=error, throws exception)

This might be an easier UX to me and no need to choose file-system specific flags. The user in this case specifically chooses to skip if prefix no longer matches. But it is worse performance compared to the original proposal in this case, as we would return a lot of results to driver just to throw the exception, not sure if its worth it.

@aokolnychyi
Copy link
Contributor Author

@anuragmantri, I am not sure relative paths will help. They will be optional and we will still need to resolve them somehow to compare with absolute paths we get from listing. I assume we will have the same issues without normalization and with different yet equivalent authorities/schemes.

@flyrain, it is an interesting idea. However, is there an efficient way to compute these values for all files in a location? I assume it will require a request per every listed file, making this extremely expensive. We may find a way to persist these values for referenced files for future use cases but I am afraid we will need to send a request for every actual file that we get after listing a location and it is going to be costly (not to mention that listing itself is already extremely expensive).

@szehon-ho, I can see you idea being implemented. If I understand correctly, it will behave like this:

  • Build a DF of reachable normalized paths with scheme/authority (if a file does not have either scheme or authority, inherit it from the location we clean).
  • Build a DF of reachable normalized paths without scheme/authority.
  • Build a DF of all actual files in the location with scheme/authority.
  • Find actual locations that don't match reachable normalized paths with scheme/authority. These are potentially orphan files.
  • Among potentially orphan files, find which of them match reachable files if we ignore their scheme and authority.
  • Act according to prefix-mismatch-mode that can be error (default), delete, ignore.

I think it is worth exploring this option. It is also not perfect, though. Cases when the bucket name (authority) is different would result in an exception. Also, are there scenarios where we want to compare authorities but not schemes?

What does everybody else think?

@szehon-ho
Copy link
Collaborator

szehon-ho commented Mar 23, 2022

Yea actually the idea is same as your original, just that:

  • rename the flag
  • add the error mode (for information to user that they were going to delete valid files)
  • combine the authority/scheme into 'prefix'

Maybe delete => force-delete to make it clear they should not do this unless they are sure.

For the algorithm, yea that works. If we want to further optimize we could even conditionally skip the prefix-less comparison for non-error mode, like in original algorithm.

I'm open if error mode proves too cumbersome to be useful. Initialy I was thinking its a safety, that they must turn off if trying to delete on a different absolute location than what the files were written with. Users could go back to running RemoveOrphan with default 'error' mode once table is fixed with all locations on the new prefix. Maybe it can be via RepairManifests with an option to rewrite the prefix, or once relative path is there we can change the root location.

Yea, I didn't initially think to distinguish scheme/authority, with just prefix, which is different if either are different. When bucket is different, we should throw exception right? (user tries to clean a different bucket than the one the table initially wrote to). Though I can see for the other case , if the scheme is different (s3 => s3a), it's debatable. I was thinking to avoid too many details in the config, just have them set prefix-mismatch-mode='ignore' in this case, but we could put another flag if we really need. Again, the user could eventually fix s3 to s3a in the paths, using some of these to-be-developed features.

@kbendick
Copy link
Contributor

Also adding that I have been working with @bijanhoule on a patch that will allow users to provide their own dataframe of actual files, to avoid skipping the listing entirely if all of the files are already known - for example if a cluster administrator is able to provide a list of files from HDFS or if a list of files is obtainable from the cloud provider (such as S3 inventory list).

This is similar in that it allows us to avoid the list but it does put the onus on the user to provide the correct listing. This would be in addition to the work mentioned above as an alternative option.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Mar 30, 2022

@kbendick, I think being able to customize DeleteOrphanFiles with a custom way of obtaining actual files is a great feature but I find it orthogonal. No matter what way we use to get actual files, we still need to normalize the locations and decide what to do if the scheme/authority don't match.

I'd consider exposing some sort of a strategy in DeleteOrphanFiles that would allow users to customize not only how to obtain actual files but also other things (e.g. how to perform normalization). For example, I find it reasonable to use Hadoop for normalization if we use Hadoop for listing. If we rely on another way of computing actual files, maybe we should use something else for normalizing.

I think once we know what to do for the current implementation we can think of a way to make it pluggable.

@kbendick
Copy link
Contributor

kbendick commented Mar 30, 2022

Agreed that making the source of the "actual files" list pluggable is orthogonal. My apologies for bringing it up here, as it's more related to just "making DeleteOrpahFiles more reliable" by avoiding the list operation on the entire object store.

I would propose, since I know that it's mostly working and that it's rather simple, that we consider the addition of a way to add in a source other than the hadoop-based list as an additional option. Right now, it's simply another table that can be referenced that contains the actual files of the file store.

Whether things like prefix normalization would be applied to the listing of files in the table or to the list of actual files, that would be outside the scope of a user-provided lsit of actual files.

For example for table's entirely on s3 in one bucket, normalization of the files in the table on the scheme of s3a or s3 is probably the most common concern that average users face in practice.

Whether the normalization to one of s3 or s3a is done one the table's file list or the list of actual files, the user could still provide a list of actual files from a more definitive source that has been properly adjusted to be s3 or s3a.

We can open a PR for review to better show what is meant. But I don't think that the normalization work needs to be completed before we make it pluggable in this way. The normalization work would naturally layer on top of this as this simply skips one small part of the actions execute method.

It will be more clear what is meant by putting up the work, but it is a rather small change that provides a very significant benefit to a lot of users right away - avoiding the listing of the entire file store if a more definitive source of truth is available.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Apr 13, 2022

@kbendick, I agree it is useful to supply locations instead of relying on listing. I believe there is an open PR that can be merged prior to any work discussed here.

@karuppayya and I spent some time discussing and I personally think @szehon-ho's idea with having an error mode is quite promising. I'd probably have only error and ignore modes and combine it with other ideas mentioned on this thread.

  • Normalize the path part of URIs to avoid cosmetic differences like extra slashes.
  • Introduce prefix-mismatch-mode option. Possible values are error (default) and ignore.
  • Expose ways to influence the comparison. For instance, allow passing equivalent schemes.

I like this approach because it will throw an exception if something suspicious happens and will provide a user ways to resolve conflicts instead of silently taking some action.

The actual algorithm can be like this:

  • Build actual file DF
    • Either provided by the user or acquired via listing. If listing, the location must contain a scheme and authority.
  • Build reachable file DF via metadata tables
  • Transform both actual and reachable DFs so that they contain scheme, authority, path columns.
  • Perform LEFT OUTER JOIN on path and map partitions.
| actual_scheme | actual_authority | path | valid_scheme | valid_authority | path |
 ---------------------------------------------------------------------------------
s3, bucket1, p0, null, null, null -> orphan (no match for the normalized path)
s3, bucket1, p1, null, null, p1 -> not orphan (null scheme/authority in metadata match any scheme/authority)
s3, bucket1, p2, s3a, bucket1, p2 -> not orphan (must have defaults for equivalent schemes like s3 and s3a)
s3, bucket1, p3, s3a, bucket2, p3 -> error by default and can be either ignored or the user may indicate that bucket1 and bucket2 are different, which will make s3, bucket1, p3 orphan. 

This way, we don't need separate DFs with and without prefix and can have more sophisticated comparison.

Any thoughts?

@rdblue
Copy link
Contributor

rdblue commented Apr 13, 2022

@aokolnychyi, that plan sounds great to me. I think that covers all the cases we need to.

@szehon-ho
Copy link
Collaborator

Yea, it sounds good.

If I understand correctly, we will have a pluggable way to specify "prefix equivalencies", and default ones like :

  • S3/S3A => schemes are equal
  • HDFS scheme=> ignore authority

Then prefix-mismatch-mode = error, will cause error if the prefixes are still different after applying these equivalencies.

@aokolnychyi
Copy link
Contributor Author

I was thinking about giving up on ignore-* options and instead exposing two options in the procedure that would be translated into Map<String, List<String>> in the underlying action:

equal_schemes
equal_authorities

That does not solve one case when buckets are indeed different but the path is the same and we have a valid orphan file.

s3://bucket1/path/to/file.parquet
s3://bucket2/path/to/file.parquet

For that purpose, we can expose unequal_authorities unless anyone has better ideas.

@szehon-ho
Copy link
Collaborator

szehon-ho commented Apr 14, 2022

That does not solve one case when buckets are indeed different but the path is the same and we have a valid orphan file.

Sorry I'm missing this example, why does equalAuthority=false not handle this case and we need more than two flags?

One thought to keep the prefix-mismatch-mode flag simple with only error/ignore, is have separate PrefixComparator:
-equals(String thisScheme, String thatScheme, String thisAuth, String thatAuth)
with built in S3/HDFS defaults, which can be configured.

It would normally not need configuration from user but could allow user to customize it (say restrict HDFS to specific authority equality depending on namenode mappings. Ex,

prefix.mismatch.mode=error
prefix.compartaor.authority.equals=nn1, 1.2.3.4:8000 // * for all equals, unset it for all unequal?
prefix.comparator.scheme.equals =s3,s3a
prefix.comparator=... //if need to override entire comparator?

But up to you, if this is overkill

@kbendick
Copy link
Contributor

@aokolnychyi, that plan sounds great to me. I think that covers all the cases we need to.

Yeah this all sounds good to me as well.

The nuances / different configurations brought up between the two should be sorted out, but from a high level what's been proposed covers all issues that I have as well as various concerns that I've heard brought up in the community.

@steveloughran
Copy link
Contributor

fwiw, s3a and abfs in the not yet released hadoop branc&3.3 adds an EtagSouce interface which FileStatus/LocatedFileStatus subclasses can implement. this lets you compare files, if the value is non null/empty, then files with different etags are guaranteed to be different.

i know that iceberg likes to builld against very old versions of hadoop, but if you do leave space in the indices for file etags, and some pluggable mechanism to retrieve them, then etag based checking would work.

note also s3 and abfs return those etags in list operations, there's no need to do HEAD calls on each file., and i think gcs does the same, though it doesn't have support through its client yet

@aokolnychyi
Copy link
Contributor Author

This issue has been resolved in #4652 by adding a prefix mismatch mode and ways to provide equal schemes and authorities. I am going to close this one. Please, feel free to re-open if any new issues come up.

Thanks everyone who participated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants