-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Support writing shredded variant in Iceberg-Spark #14297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
16b7a09 to
dc4f72e
Compare
97851f0 to
b87e999
Compare
|
@amogh-jahagirdar @Fokko @huaxingao Can you help take a look at this PR and if we have better approach for this? |
|
cc @RussellSpitzer, @pvary and @rdblue Seems it's better to have the implementation with new File Format proposal but want to check if this is acceptable approach as an interim solution or you see a better alternative. |
|
@aihuaxu: Don't we want to do the same but instead of wrapping the Would this be prohibitively complex? |
|
In Spark DSv2, planning/validation happens on the driver. For shredded variant, we don’t know the shredded schema at planning time. We have to inspect some records to derive it. Doing a read on the driver during Because of that, the current proposed Spark approach is: put the logical variant in the writer factory, on the executor, buffer the first N rows, infer the shredded schema from data, then initialize the concrete writer and flush the buffer. I believe this PR follow the same approach, which seems like a practical solution to me given DSV2's constraints. |
|
Thanks for the explanation, @huaxingao! I see several possible workarounds for the DataWriterFactory serialization issue, but I have some more fundamental concerns about the overall approach. Even if we accept that the written data should dictate the shredding logic, Spark’s implementation—while dependent on input order—is at least somewhat stable. It drops rarely used fields, handles inconsistent types, and limits the number of columns. |
|
Thanks @huaxingao and @pvary for reviewing, and thanks to Huaxin for explaining how the writer works in Spark. Regarding the concern about unstable schemas, Spark's approach makes sense:
We could implement similar heuristics. Additionally, making the shredded schema configurable would allow users to choose which fields to shred at write time based on their read patterns. For this POC, I'd like any feedback on whether there are any significant high-level design options to consider first and if this approach is acceptable. This seems hacky. I may have missed big picture on how the writers work across Spark + Iceberg + Parquet and we may have better way. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This PR caught my eye, as I've implemented the equivalent in DuckDB: duckdb/duckdb#19336 The PR description doesn't give much away, but I think the approach is similar to the proposed (interim) solution here: buffer the first rowgroup, infer the shredded schema from this, then finalize the file schema and start writing data. We've opted to create a We've also added a copy option to force the shredded schema, for debugging purposes and for power users. As for DECIMAL, it's kind of a special case in the shredding inference. We only shred on a DECIMAL type if all the decimal values we've seen for a column/field have the same width+scale, if any decimal value differs, DECIMAL won't be considered anymore when determining the shredded type of the column/field |
|
This PR is super exciting! Regarding the heuristics - I'd like to propose adding table properties as hints for variant shredding. |
That is correct.
I'm still trying to improve the heuristics to use the most common one as shredding type rather than the first one and probably cap the number of shredded fields, etc. but it doesn't need 100% consistent type to be shredded.
Yeah. I think that makes sense for advanced user to determine the shredded schema since they may know the read pattern.
Why is DECIMAL special here? If we determine DECIMAL4 to be shredded type, then we may shred as DECIMAL4 or not shred if they cannot fit in DECIMAL4, right? |
Yeah. I'm also thinking of that too. Will address that separately. Basically based on read pattern, the user can specify the shredding schema. |
gkpanda4
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When processing JSON objects containing null field values (e.g., {"field": null}), the variant shredding creates schema columns for these null fields instead of omitting them entirely. This would cause schema bloat.
Adding a null check in ParquetVariantUtil.java:386 in the object() method should fix it.
b87e999 to
2e81d79
Compare
2e81d79 to
7e1b608
Compare
I addressed this null value check in VariantShreddingAnalyzer.java instead. If it's NULL, then we will not add the shredded field. |
7e1b608 to
b74addb
Compare
7c805f6 to
67dbe97
Compare
67dbe97 to
5c0533e
Compare
| } | ||
| } | ||
|
|
||
| PhysicalType getMostCommonType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this logic work when a field would have the same number of different datatypes, for eg 2 of INT8, STRING, DECIMAL8?
Will this logic choose one at random?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question. I think we should add an explicit deterministic tie-break and also add a regression test that creates a perfect tie to ensure inference is stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we can do is when there is tie, we can choose the last type (order the type names) to have consistent shredding type.
| * <li>shred to the most common type | ||
| * </ul> | ||
| */ | ||
| public class VariantShreddingAnalyzer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test scenario for a field, for example, ZIP codes like 98101, 97201, and 10001, it get parsed as different integer types (INT32 + INT16).
Should having a type family check makes more sense? Like grouping them as
- Integer Family: INT8, INT16, INT32, INT64 → promote to most capable type
- Decimal Family: DECIMAL4, DECIMAL8, DECIMAL16 → promote to most capable type
- Boolean Family: TRUE, FALSE → treat as single boolean type
Bit on lines with Spark side implementation https://github.com/apache/spark/pull/52406/files#diff-fb3268e5296f089d5f57c168f3e9cd74a401b184db3f30982588a134d8abfa53R322-R326 where all integer types are converted to Long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the promotion for integer family and decimal family. True/false is already treated as single boolean type.
For Decimal type, right now when the scale doesn't match, it will not scale and will just write non-shredding value. I can address that separately - which seems we need to introduce DecimalWriter to handle such scale conversion.
Please take another look.
| if (parquetCompressionLevel != null) { | ||
| writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); | ||
| } | ||
| writeProperties.put(SparkSQLProperties.SHRED_VARIANTS, String.valueOf(shredVariants())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: shredVariants() is evaluated twice. Could we store it in a local boolean shredVariants = shredVariants()?
|
|
||
| @Override | ||
| public void setColumnStore(ColumnWriteStore columnStore) { | ||
| // Ignored for lazy initialization - will be set on actualWriter after initialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setColumnStore is currently a no-op. That’s fine during the buffering phase, but after actualWriter is initialized, Parquet will call setColumnStore again for new row groups. Should we forward the store to actualWriter when it’s non-null (e.g.,
if (actualWriter != null) actualWriter.setColumnStore(columnStore);
) to avoid writing to a stale store?
Also, can we add a regression test that forces multiple row groups (e.g., tiny row-group size) to ensure the writer remains correct across row-group rollover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Updated to set columnStore for new row groups and added the coverage.
| 64, | ||
| ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, | ||
| null, | ||
| rowGroupOrdinal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we’re constructing a new ColumnChunkPageWriteStore with a hardcoded column index truncate length (64) and fileEncryptor = null. Should we instead reuse the ParquetWriter’s configured values (truncate length / encryption) to avoid behavior differences when variant shredding is enabled? Also shall we add a small regression test that enables Parquet encryption (or sets a non-default truncate length) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I forgot to update this part. Let me get the configured value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated. Please help take a look.
| } | ||
| } | ||
|
|
||
| PhysicalType getMostCommonType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question. I think we should add an explicit deterministic tie-break and also add a regression test that creates a perfect tie to ensure inference is stable.
| writeNull(valueWriter, repetitionLevel, valueDefinitionLevel); | ||
| } catch (IllegalArgumentException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way we can avoid having to try/fail here? Just wondering if it's just for decimals can we do some decimal specific check or is this just much simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is simpler. Probably we can introduce a DecimalWriter to handle that by checking if there is scale mismatch and fallback to write to value field.
Do you prefer that approach?
|
|
||
| @Override | ||
| public void add(T value) { | ||
| if (model instanceof WriterLazyInitializable lazy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of feels to me like we should be making a subclass of ParquetWriter which only takes lazy models rather than doing a special case init here based on type matching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possibility, can we create the ParquetWriter after buffering? The timeline now just seems a little odd
Make Writer, Start buffering data, InitWriter
I haven't walked through this but I would have thought we would do something like
Buffer Data, Make Writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try out such refactoring.
| this.closed = true; | ||
|
|
||
| // Force initialization if lazy writer still has buffered data | ||
| if (model instanceof WriterLazyInitializable lazy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little confusing to me, this is for the case in which our model has buffered but not yet put the information in to a row group? IE we are still deciding how to shred or something like that but close has been called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. This is to handle special case that there are only a few rows which doesn't trigger schema inference logic and the file gets closing. We need to check if initialization has been done here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the comment a little bit. Hope it makes sense.
parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
Outdated
Show resolved
Hide resolved
7758429 to
d7e15a7
Compare
This change adds support for writing shredded variants in the iceberg-spark module, enabling Spark to write shredded variant data into Iceberg tables.
Ideally, this should follow the approach described in the reader/writer API proposal for Iceberg V4, where the execution engine provides the shredded writer schema before creating the Iceberg writer. This design is cleaner, as it delegates schema generation responsibility to the engine.
As an interim solution, this PR implements a writer with lazy initialization for the actual Parquet writer. It buffers a portion of the data first, derives the shredded schema from the buffered records, then initializes the Parquet writer and flushes the buffered data to the file.
The current shredding algorithm is to shred to the most common type for a field.