-
Notifications
You must be signed in to change notification settings - Fork 2.9k
API, Core: Add schema_id to ContentFile #4898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@ConeyLiu, thanks for breaking this up, but this is still far too large for a single PR. There's no need to update all of the writers in a single PR. Instead, this should focus on core and API classes and just read null values for the new field. You can add writes later. |
|
Hi @rdblue, thanks for the review. I have reverted the changes for the writers. Please take another look, thanks a lot. |
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually now realizing this is a change in the spec, initially thought it's something we can derive from existing metadata.
Is this something we can do or need to wait until V3?
@szehon-ho, this is something we can do because it is backward compatible. Older readers will ignore new fields, so we can add it safely. And if we don't have a schema ID for a column, then we just return null and skip the optimization. |
|
A little busy recently. Will address the comments tomorrow. |
|
Thanks, @rdblue @szehon-ho for the review. Comments have been addressed. |
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, is it possible to add a test to try to deserialize an older manifest entry without schema_id?
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
Outdated
Show resolved
Hide resolved
It seems to need to implement customized V1Writer/V2Writer and those |
|
Yea there is some limited discussion in this related issue, but I guess no good conclusions: #2542. Maybe a test writer that creates metadata files with all optional columns as null? That way can test all the new columns at once. By the way, change mostly looks good, and it's good you put the new field is optional to avoid the issue like the one mentioned. I just dont know enough about the serialization/deserialization code to be sure if there are any other problems with previous serialized metadata, so was hoping to have a test to verify it. Though can leave to @rdblue to approve if he is confident , and we can tackle the backward compat tests on the side. |
|
Thanks @szehon-ho for the review and suggestion.
I will add the test later. |
| ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds | ||
| Integer sortOrderId = 2; | ||
|
|
||
| String fileName = String.format("OldManifestFileV%s.avro", formatVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OldManifestFileV1.avro/OldManifestFileV2.avro/ is the previously DataFile spec(the file instance is https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java#L74). Hi, @szehon-ho I think this test covered your concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really great to have this test. I was initially thinking that we want to have a TestV1Writer / TestV2Writer that writes all the optional fields as null, instead of checking in the old version avro file. Is that possible? I can also take a look myself if that is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I have fully understood your ways. In this pr, we not only add a new field to the DataFile and also changed the return StructType of DataFile.getType. Shouldn't we need to customize DataFile/V1Metadata/V2Metadata to write the DataFile with old data spec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I was thinking to make a Test version of those writers.. not sure if its possible. Anyway, I guess this works too, only issue is wont be too debuggable if something goes wrong.
|
Hi @rdblue @szehon-ho, I am sorry for the late update. The compatible test has been added. Hopeful, you could take another look when you are free. |
| public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String location, | ||
| PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, | ||
| SortOrder sortOrder, int... equalityFieldIds) { | ||
| this(appender, format, location, spec, partition, keyMetadata, sortOrder, -1, equalityFieldIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that great to have to add a new constructor to all these. As we already have WriterFactory that abstract it and should be the ones getting called, I wonder if we can just change this interface? cc @rdblue @aokolnychyi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a public class and constructor, I think we should keep the compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan of it, I think we can have a builder like SparkAppenderFactory (where we did some refactor to not have one constructor per new argument)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do it in this patch or a separate patch?
| ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds | ||
| Integer sortOrderId = 2; | ||
|
|
||
| String fileName = String.format("OldManifestFileV%s.avro", formatVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really great to have this test. I was initially thinking that we want to have a TestV1Writer / TestV2Writer that writes all the optional fields as null, instead of checking in the old version avro file. Is that possible? I can also take a look myself if that is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Im sorry for the late responses, as Im definitely not the most familiar to review this part of the code. I was chatting with @dramaticlly and he could possibly help take a look at the idea for making a cleaner backward compatiblity test in general, over at #2542. Dont want to block the change until that's there, but for me its more re-assuring to have those tests overall :)
Also I noticed, spec-id and schema is already written in the header of each manifest. As far as I can tell, it seems to be the right one that the manifest was written in, even after rewriteManifests. Wondering at a high level, is it adequate for the optimization you are planning?
| public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String location, | ||
| PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, | ||
| SortOrder sortOrder, int... equalityFieldIds) { | ||
| this(appender, format, location, spec, partition, keyMetadata, sortOrder, -1, equalityFieldIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan of it, I think we can have a builder like SparkAppenderFactory (where we did some refactor to not have one constructor per new argument)
|
Hi @szehon-ho thanks for reviewing this again.
What happened after the rewritten data file? It seems like the schema of the manifest file is the current table schema. And those manifest entries in the same manifest file could have a different schema after rewrite. |
|
@ConeyLiu that's a good question, I think (may be wrong) rewriteDataFiles groups files by partition/partition spec, and may not preserve the old schemas. Ie, all the data files are rewritten with latest schema of that partition spec. I think the situation would be the same even in your proposal to add new schemaid field to data_file, right? After rewriteDataFiles we have to carry over the latest schema-id of each spec , in order for your initial proposed optimization to be accurate? Because there may be data in the new file that was written by a later schema. |
You are correct. The data file with the new spec after rewrite. We can not benefit from the schema evaluation because we lost the original schema information.
In RewriteManifest, we use the current table partition spec or the specified spec with spec ID. I think the schema used in the current space is not the same one as the original schema for the old manifest file. That's because we will rewrite the partition spec when updating the table schema. Please correct me if I am wrong. |
|
Yea you are right, it seems it will set the latest schema of each spec on the rewritten manifests, so the information is lost if you evolve schemas within a spec. |
|
@szehon-ho,@rdblue Any update here? |
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea general direction makes sense to me. But as its changing the spec, would love to get another opinion as well . Pinged @aokolnychyi on this if he has time
| ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds | ||
| Integer sortOrderId = 2; | ||
|
|
||
| String fileName = String.format("OldManifestFileV%s.avro", formatVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I was thinking to make a Test version of those writers.. not sure if its possible. Anyway, I guess this works too, only issue is wont be too debuggable if something goes wrong.
|
Will this mean all evaluator logic will have to change to be schema specific? Is there a simple example how this will be consumed? |
|
Thanks @szehon-ho @aokolnychyi
We don't need to change those existing evaluators. Just need to create a new return CloseableIterable.filter(
open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
entry -> {
boolean keep = entry != null;
if (keep && schemaEvaluator != null && entry.file().schemaId() > -1) {
// evaluate based on the schemaId
keep = schemaEvaluator.eval(schemasById.get(entry.file().schemaId()));
}
return keep &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()) &&
inPartitionSet(entry.file());
}); |
|
Hi @rdblue @szehon-ho @aokolnychyi do you have any time to look at this again? |
| CONTENT, | ||
| FILE_PATH, | ||
| FILE_FORMAT, | ||
| SCHEMA_ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho if you are concerned about this order. We may put the SCHEMA_ID to last to align other new fields. Then we don't need to update too many methods of get/put in BaseFile. And some of the spark/flink UTs are not needed to update as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the last
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two manifest files are wroten with the old spec. They are used to test use spec reader reading the files wroten with old spec.
8ccaead to
fafdeb4
Compare
| EQUALITY_IDS, | ||
| SORT_ORDER_ID); | ||
| SORT_ORDER_ID, | ||
| SCHEMA_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put it in the last one to reduce the code changes.
|
Hi @rdblue @szehon-ho @aokolnychyi @RussellSpitzer @nastra, could you help to review this? This should be useful for tables with frequent column additions or deletions. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This is the first part of #4842. Add the schema id to DataFile/DeteFile/ManifestFile and which could be used to evaluate the filter expression based on the schema.