Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modified parquet decompression from buffered to streaming operation #5712

Merged
merged 10 commits into from
Jul 12, 2024

Conversation

malhotrashivam
Copy link
Contributor

@malhotrashivam malhotrashivam commented Jul 3, 2024

This helps reduce memory consumption when reading parquet files by almost 30%.

@malhotrashivam malhotrashivam added feature request New feature or request parquet Related to the Parquet integration NoDocumentationNeeded ReleaseNotesNeeded Release notes are needed labels Jul 3, 2024
@malhotrashivam malhotrashivam added this to the 0.36.0 milestone Jul 3, 2024
@malhotrashivam malhotrashivam self-assigned this Jul 3, 2024
@malhotrashivam malhotrashivam changed the title Moved parquet decompression from buffered to streaming operation feat: Moved parquet decompression from buffered to streaming operation Jul 3, 2024
@malhotrashivam malhotrashivam changed the title feat: Moved parquet decompression from buffered to streaming operation feat: Modified parquet decompression from buffered to streaming operation Jul 3, 2024
extensions/parquet/compression/build.gradle Outdated Show resolved Hide resolved
Comment on lines 60 to 63
final InputStream decompressedInput =
super.decompress(bufferedInputStream, compressedSize, uncompressedSize, decompressorCache);
final ByteBuffer decompressedBuffer =
CompressorAdapter.readNBytes(decompressedInput, uncompressedSize, new byte[uncompressedSize]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems very sad that we need to do the full decompression to figure out whether it's LZ4 or LZ4_RAW.

We should either handle the LZ4/LZ4_RAW exception handling at a higher layer (so we don't need to materialize all at once into ByteBuffer), or we should have a specialized InputStream that could do the reset() + fallback internally without the all-at-once read. Is there a hard limit on how many bytes it takes to fail an LZ4_RAW mislabelled as LZ4?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @devinrsmith in principle, but:

  1. At least it's only the first read.
  2. I don't see how moving it up a layer helps, since that would imply that every caller needs to handle the fallback.
  3. I don't see how moving it down into a wrapped InputStream is better than this, since we'd need to write more code in order to present the InputStream interface, and I worry about edge cases where we can read some bytes but not all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is an exact limit that I can check here, so just checking for a failure.
And yea, I couldn't find an easy way to do this without making a bigger change, and like Ryan said, this extra double buffering will happen only once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is room to have a better impl (unified InputStream) return in the future. Or, some way for the user to disable this fallback logic (so if they trust their parquet when it says LZ4, they don't have to pay this extra buffering cost).

Comment on lines 60 to 63
final InputStream decompressedInput =
super.decompress(bufferedInputStream, compressedSize, uncompressedSize, decompressorCache);
final ByteBuffer decompressedBuffer =
CompressorAdapter.readNBytes(decompressedInput, uncompressedSize, new byte[uncompressedSize]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @devinrsmith in principle, but:

  1. At least it's only the first read.
  2. I don't see how moving it up a layer helps, since that would imply that every caller needs to handle the fallback.
  3. I don't see how moving it down into a wrapped InputStream is better than this, since we'd need to write more code in order to present the InputStream interface, and I worry about edge cases where we can read some bytes but not all.

rcaudy
rcaudy previously approved these changes Jul 11, 2024
Copy link
Member

@devinrsmith devinrsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tough to review and be confident that we are handling things "correctly" in all cases. Looks ok. Need to fix conflict wrt main.

devinrsmith
devinrsmith previously approved these changes Jul 12, 2024
Copy link
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

@malhotrashivam malhotrashivam merged commit f8b5e19 into deephaven:main Jul 12, 2024
16 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Jul 12, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature request New feature or request NoDocumentationNeeded parquet Related to the Parquet integration ReleaseNotesNeeded Release notes are needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants