-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-46990][SQL] Fix loading empty Avro files emitted by event-hubs #45578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // In some cases of empty blocks in an Avro file, `fileReader.hasNext()` returns false but | ||
| // advances the cursor to the next block, so we need to call `fileReader.hasNext()` again | ||
| // to correctly report whether the next record exists. | ||
| val r = (fileReader.hasNext || fileReader.hasNext) && !fileReader.pastSync(stopPosition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val r = (fileReader.hasNext || fileReader.hasNext) && !fileReader.pastSync(stopPosition) | |
| val noMoreData = (fileReader.hasNext || fileReader.hasNext) && !fileReader.pastSync(stopPosition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I updated the code but I used moreData variable instead and if (!moreData).
| def hasNextRow: Boolean = { | ||
| while (!completed && currentRow.isEmpty) { | ||
| if (fileReader.pastSync(stopPosition)) { | ||
| // In some cases of empty blocks in an Avro file, `fileReader.hasNext()` returns false but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what's the "some cases" here. It looks weird to call hasNext twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be a bug in Avro. When blockRemaining can be 0, hasNext tries to load the next block but still checks if blockRemaining != 0 returning false when the next block is actually available.
The Avro FileReader API is limited and the only thing I could do is to just try to call hasNext again - seems to work for all of the tests cases including empty blocks.
You are right, ideally we should just loop over hasNext until it actually returns false or we reach EOF. I tried to implement it but I could not because FileReader does not expose the current stream offset (tell() actually returns the block start which is different).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the comments to elaborate on this a bit more.
|
Yes, to be honest, I could not find a better solution that would fix empty file and the empty blocks issue. I think this should probably be fixed in Avro. @gengliangwang @cloud-fan How possible is it to fix in Avro and then upgrade the dependency in Spark? Thanks. |
|
I think we can merge this fix first. And we can always revisit the code change here if Avro releases your fix. Thanks for the works! |
| // In the case of empty blocks in an Avro file, `blockRemaining` could still read as 0 so | ||
| // `fileReader.hasNext()` returns false but advances the cursor to the next block, so we | ||
| // need to call `fileReader.hasNext()` again to correctly report if the next record | ||
| // exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, what if there are continuous empty blocks? how many times we need to call hasNext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I was thinking but I could not find an example that double hasNext has not fixed. So it seems to be fine. I just could not find a better way of solving this issue at the moment given the FileReader interface.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
|
thanks, merging to master! |
What changes were proposed in this pull request?
This PR fixes a regression introduced by SPARK-46633, commit: 3a6b9ad where one could not read an empty Avro file as the reader would be stuck in an infinite loop.
I reverted the reader code to the pre-SPARK-46633 version and updated handling for empty blocks. When reading empty blocks in Avro,
blockRemainingcould still be read as 0. Call tohasNextstatus would load the next block but would still return false because of the final checkblockRemaining != 0. Calling the method again picks up the next non-empty block and seems to fix the issue.Why are the changes needed?
Fixes a regression introduced in SPARK-46633.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
I added a unit test to verify that empty files can be read correctly.
Was this patch authored or co-authored using generative AI tooling?
No.