Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase weight of splits with delete files in iceberg #23058

Merged
merged 1 commit into from
Aug 18, 2024

Conversation

raunaqmorarka
Copy link
Member

Description

Splits with deletes require more work to process than a comparable split without delete.
This should be accounted in iceberg split weight

Additional context and related issues

Release notes

(x) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

Splits with deletes require more work to process than a
comparable split without delete. This should be accounted
in iceberg split weight
Copy link
Member

@pettyjamesm pettyjamesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, approved

@raunaqmorarka
Copy link
Member Author

Makes sense to me, approved

One thing which I'm wondering about is are we allowed to use weights above 1.0 ?
I'm thinking that when 2 splits are at full target split size, but one has deletes attached to it, then that one should have higher weight. But expressing that requires going over the current limit of 1.0.
Would everything work as expected if I simply increased allowed range of split weight from [0, 1] to [0, 2] ?

double weight = dataWeight;
if (task.deletes().stream().anyMatch(deleteFile -> deleteFile.content() == POSITION_DELETES)) {
// Presence of each data position is looked up in a combined bitmap of deleted positions
weight += dataWeight;
Copy link
Member

@alexjo2144 alexjo2144 Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doubling the weight due to any pos deletes being present seems a bit heavy. There's also the length of the delete files we could use to approximate additional work from them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem heavy, but since the calculation is clamped to [minimumAssignedSplitWeight, 1.0], this really only matters if the split is small in the first place. In practice, this will turn a split that would be "half-weighted" by the data size to be "whole-weighted".

As a worked example, assuming the default value of 128MiB for read.split.target-size, this would make a 64MiB scan with positional deletes to be considered to be equal to a 128MiB scan without them- or a 32MiB scan with them equal to a 64MiB without them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Length of delete file wouldn't work well because compression/encoding will reduce it to a small size. The presence of any delete position will result in significant extra work: read delete file, convert to roaring bitmap, produce row positions block, look-up every row position in bitmap, produce a filtered page.

@pettyjamesm
Copy link
Member

One thing which I'm wondering about is are we allowed to use weights above 1.0 ? I'm thinking that when 2 splits are at full target split size, but one has deletes attached to it, then that one should have higher weight. But expressing that requires going over the current limit of 1.0. Would everything work as expected if I simply increased allowed range of split weight from [0, 1] to [0, 2] ?

This was discussed in #12832. At the end of that conversation, @sopel39 pointed out that the behavior present in UniformNodeSelector at the time that tries to balance out split assignments across nodes could infinite loop if splits were weighted above 1.0, but I don't think that logic is there anymore (or at least I couldn't find it).

That said, I don't know that you really need to express weights larger than "standard". The mechanism is really there to make sure that workers have enough splits to stay busy between scheduling batches, but as a mechanism it can't guarantee that CPU load is balanced for a number of reasons. Basically, it's important to capture "this work is cheap, send more splits to make sure the worker can stay busy" but "this work is expensive and will take a long time" is handled by the scheduler backing off and waiting for the worker split queue to drain.

@raunaqmorarka raunaqmorarka merged commit 5286f64 into trinodb:master Aug 18, 2024
42 checks passed
@raunaqmorarka raunaqmorarka deleted the ice-delete-wt branch August 18, 2024 04:53
@github-actions github-actions bot added this to the 455 milestone Aug 18, 2024
@sopel39
Copy link
Member

sopel39 commented Aug 19, 2024

@raunaqmorarka how did you found the issue?

@raunaqmorarka
Copy link
Member Author

@raunaqmorarka how did you found the issue?

We had a customer issue where we encountered

io.trino.spi.TrinoException: unexpected error calling sendUpdate()
at io.trino.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:739)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:79)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.IllegalArgumentException: io.trino.server.TaskUpdateRequest could not be converted to JSON
at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:220)
at io.trino.server.remotetask.HttpRemoteTask.sendUpdateInternal(HttpRemoteTask.java:778)
at io.trino.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:736)
... 4 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Maximum Java array size (2GB) exceeded by `ByteArrayBuilder` (through reference chain: io.trino.server.TaskUpdateRequest["splitAssignments"]->com.google.common.collect.SingletonImmutableList[0]->io.trino.execution.SplitAssignment["splits"]->com.google.common.collect.RegularImmutableSet[736]->io.trino.execution.ScheduledSplit["split"]->io.trino.metadata.Split["connectorSplit"]->io.trino.plugin.iceberg.IcebergSplit["deletes"]->com.google.common.collect.RegularImmutableList[6565]->io.trino.plugin.iceberg.delete.DeleteFile["path"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361)
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:323)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:778)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:653)
at io.trino.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:119)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107)
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:502)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:422)
at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1570)
at com.fasterxml.jackson.databind.ObjectWriter._writeValueAndClose(ObjectWriter.java:1275)
at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1165)
at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:217)
... 6 more
Caused by: java.lang.IllegalStateException: Maximum Java array size (2GB) exceeded by `ByteArrayBuilder`
at com.fasterxml.jackson.core.util.ByteArrayBuilder._allocMore(ByteArrayBuilder.java:317)
at com.fasterxml.jackson.core.util.ByteArrayBuilder.write(ByteArrayBuilder.java:287)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2210)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator.writeString(UTF8JsonGenerator.java:528)
at com.fasterxml.jackson.databind.ser.std.StringSerializer.serialize(StringSerializer.java:41)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
... 38 more

This table had a lot of small splits with many delete files. Increasing split weight for delete files should help to reduce chances of this occurring. Alternative is to manually lower max_unacknowledged_splits_per_task.

@sopel39
Copy link
Member

sopel39 commented Aug 19, 2024

How does Maximum Java array size (2GB) exceeded by ByteArrayBuilder` relate to delete files?

@raunaqmorarka
Copy link
Member Author

How does Maximum Java array size (2GB) exceeded by ByteArrayBuilder` relate to delete files?

There were a lot of splits with many delete files, these contribute to size of task update json. Increasing split weights is a way to force smaller task updates. Also, we should queue fewer splits with deletes as we know these are not as cheap as "normal" splits.

@sopel39
Copy link
Member

sopel39 commented Aug 19, 2024

There were a lot of splits with many delete files, these contribute to size of task update json

There is a machanism in HttpRemoteTask that prevents too big requests (io.trino.server.remotetask.HttpRemoteTask#adjustSplitBatchSize). It didn't work?

@sopel39
Copy link
Member

sopel39 commented Aug 19, 2024

Also, there might just be small splits without delete files. Isn't that a problem too?

@Dith3r
Copy link
Member

Dith3r commented Aug 22, 2024

There were a lot of splits with many delete files, these contribute to size of task update json

There is a machanism in HttpRemoteTask that prevents too big requests (io.trino.server.remotetask.HttpRemoteTask#adjustSplitBatchSize). It didn't work?

Splits are serialized before adjustment to get initial size of body. We could use fallback to get size via its representation in Java multiplied by some factor as estimation.

@sopel39
Copy link
Member

sopel39 commented Aug 23, 2024

Splits are serialized before adjustment to get initial size of body

But why they take 2GB in the first place. Seems like it might kill coordinator <-> worker communication anyway.

We could also have slow-start mechanism (with exponential ramp-up).

@Dith3r
Copy link
Member

Dith3r commented Aug 24, 2024

It is related to iceberg delete files data included at split level. Adjustment could kick in if we get first body size estimate or on failure fallback to min guaranteed splits per task.

@sopel39
Copy link
Member

sopel39 commented Aug 26, 2024

Ok, but how do we end-up with 2GB of serialized data in the first place. Is there something terribly wrong with delete file serde?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

5 participants