-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Managed Iceberg] add GiB autosharding #32612
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @m-trieu for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
|
||
static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB | ||
// Used for auto-sharding in streaming. Limits number of records per batch/file | ||
private static final int FILE_TRIGGERING_RECORD_COUNT = 100_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These constants were determined by experimentation or by looking at another sink implementation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's taken from WriteFiles:
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
Lines 149 to 152 in 6a7ffa5
// The record count and buffering duration to trigger flushing records to a tmp file. Mainly used | |
// for writing unbounded data to avoid generating too many small files. | |
public static final int FILE_TRIGGERING_RECORD_COUNT = 100000; | |
public static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB as of now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BigQuery batch loads is similar but has a greater record count limit (500,000):
Lines 120 to 126 in 6a7ffa5
// If user triggering is supplied, we will trigger the file write after this many records are | |
// written. | |
static final int FILE_TRIGGERING_RECORD_COUNT = 500000; | |
// If user triggering is supplied, we will trigger the file write after this many bytes are | |
// written. | |
static final int DEFAULT_FILE_TRIGGERING_BYTE_COUNT = | |
AsyncWriteChannelOptions.UPLOAD_CHUNK_SIZE_DEFAULT; // 64MiB as of now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be a good idea in a follow up PR to expose record and byte count, in case the user wants more flexibility. Also not sure if we want this current default of 100000 to be different from the old default of TRIGGERING_RECORD_COUNT=50,000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for ManagedIO in general, it might be good to limit the number of knobs we expose. The idea is for Beam/runner to find reasonable optimal values and manage it on behalf of users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to not exposing it (at least not from the get-go)
Also not sure if we want this current default of 100000 to be different from the old default of TRIGGERING_RECORD_COUNT=50,000
Before a recent PR (#32451), the old default actually wasn't used anywhere. This IO is still pretty new and we haven't stress tested it yet to see what's most optimal. I figured a good starting point would be to follow WriteFiles (100,000) because it's essentially the same function.
BTW this should block the release IMO since it's an update incompatible change on top of another unreleased update incompatible change. |
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
Show resolved
Hide resolved
Let us update CHANGES.md |
Does this introduce an additional shuffle (and if so, are we OK with that)? |
@robertwb does GroupIntoBatches count as a shuffle? if so then yeah that's the cost here I think it's a better alternative than writing a huge amount of small files though -- the difference is pretty noticeable. We also use GroupIntoBatches for other performant IOs (all BigQueryIO writes, FileIO, TextIO). Specifically for Iceberg, the table format's query planning is sensitive to the number of files. Some references: [1], [2], [3]. I was thinking of instead adding a step (after file writes) that merges files together. Iceberg does provide a Spark operation that merges files across the entire table (compaction), but I couldn't find anything more light-weight. |
I don't think Beam "GroupIntoBatches" introduces a shuffle but I suspect Dataflow would introduce a shuffle/re-shard to make auto-sharding work (not sure how costly that is). Agree with Ahmed that benefits here seems to outweigh the associated cost. We did something very similar to BQ streaming sink to reduce the number of output streams (went from a manually configured 50 shards to auto-sharding). In practice I think, either we would introduce the sharding here or customers would have to add that manually to their pipelines. I prefer the former. |
OK, we can go with that. |
* [Managed Iceberg] add GiB autosharding * trigger iceberg integration tests * fix test * add to CHANGES.md * increase GiB limits * increase GiB limits * data file size distribution metric; max file size 512mb
* [Managed Iceberg] add GiB autosharding * trigger iceberg integration tests * fix test * add to CHANGES.md * increase GiB limits * increase GiB limits * data file size distribution metric; max file size 512mb
Adds auto-sharding to Iceberg streaming writes using GroupIntoBatches.
In streaming writes, bundles are often very small and can even be single-elements. We write each bundle to a file, so this can behavior can lead to many small files.
To solve this, we group records into batches set by a triggering frequency (as well as record and byte size limits). Now, the number of written data files is more easily controlled. Essentially, every triggering frequency duration, roughly N data files are written, where N is the number of concurrent DoFns. To decrease the number of written files, one can increase their triggering frequency or reduce their parallelism.