-
Notifications
You must be signed in to change notification settings - Fork 2.9k
[SPARK] Spark parquet read timestamp without timezone #2282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks so much for writing this up! I'll take a look soon |
| return TimestampType$.MODULE$; | ||
| } | ||
| throw new UnsupportedOperationException( | ||
| "Spark does not support timestamp without time zone fields"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we do the flag check here as well to check whether or not we have enabled the "Handle without timezone" flag here as well? We may be using this not on the read path (like in the migrate/snapshot code) and it would be good to catch it here as well and make sure users know what is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might involve a bigger refactor, including changing the method signature to accept the flag to .primitive(..., handleTimestampWithoutTimezoneFlag). I'm not sure if that will break other stuff. I added in the logic as comments for now as to how to implement it after we can settle on the implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's fair, I can always fix this in the migrate code directly
| private static List<Row> read(String table, boolean vectorized, String select0, String... selectN) { | ||
| Dataset<Row> dataset = spark.read().format("iceberg") | ||
| .option("vectorization-enabled", String.valueOf(vectorized)) | ||
| .option("read-timestamp-without-zone", "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add in a test for the error message and exception when this flag is not true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nvm, I see the test above
| // is adjusted so that the corresponding time in the reader timezone is displayed. | ||
| // When set to false (default), we throw an exception at runtime | ||
| // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields | ||
| this.readTimestampWithoutZone = options.get("read-timestamp-without-zone").map(Boolean::parseBoolean).orElse(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe i'm being silly, but I would like "spark" to be in this flag like "spark handle timestamp without zone" and have it handle reading and writing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed it
| private StructType lazyType() { | ||
| if (type == null) { | ||
| Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()), | ||
| "Spark does not support timestamp without time zone fields"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like this to be a bit more elaborate. Something like
"Cannot handle timestamp without timezone fields in Spark. Spark does not natively support this type but if you would like to handle all timestamps as timestamp with timezone set 'flag name' to true. This will not change the underlying values stored but will change their displayed values in Spark. For more information see ... some website reference?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been looking for a website to put as a reference in the error message and I'm having a hard time finding a specific site that talks about "Spark not supporting timestamp without timezone" fields. Could I maybe please get some pointers as to where I should look. I saw some relevant information on this site :
https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and-spark-sql-timestamps - That is probably enough info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed it
| // is adjusted so that the corresponding time in the reader timezone is displayed. | ||
| // When set to false (default), we throw an exception at runtime | ||
| // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields | ||
| this.readTimestampWithoutZone = options.getBoolean("read-timestamp-without-zone", false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably have the flag name as a constant somewhere, maybe in the Spark Util class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed it
| public StructType readSchema() { | ||
| if (readSchema == null) { | ||
| Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(expectedSchema), | ||
| "Spark does not support timestamp without time zone fields"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above on the error message, since it's gonna be in a few places we should probably have the error message as a constant too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed it
| return new ReaderFactory(readUsingBatch ? batchSize : 0); | ||
| } | ||
|
|
||
| private static boolean hasTimestampWithoutZone(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another candidate for the public utility class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed it
| @Test | ||
| public void testUnpartitionedTimestampWithoutZoneError() { | ||
| exception.expect(IllegalArgumentException.class); | ||
| exception.expectMessage("Spark does not support timestamp without time zone fields"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some helpers for checking exceptions are thrown, see
/**
* A convenience method to avoid a large number of @Test(expected=...) tests
* @param message A String message to describe this assertion
* @param expected An Exception class that the Runnable should throw
* @param callable A Callable that is expected to throw the exception
*/
public static void assertThrows(String message,
Class<? extends Exception> expected,
Callable callable) {
assertThrows(message, expected, null, callable);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed it
|
Left a few commons! Very excited for this to finally get fixed :) Any chance you want to do the "write" pathway as well? I don't think it would be that many more changes to the code. |
|
Also nit, you'll probably want to fetch apache master and rebase this branch that to drop the "merge 1 commit" from the commits in the PR |
Yea, I can attempt to do the writes as well. Should I do it the same PR ? |
|
You could do another if you like, but I think it's also reasonable to do it here. Up to you |
I can make the changes you mentioned and also open a new PR and comply with the project's commit history standards. I think I missed a few things that you mentioned and also not prefixing my commits with the project that the commits relate to (i.e. Should I open a new PR after making the changes and close this one ? EDIT: just did what you mentioned, and was able to rebase |
Okay, I'll just do it in this one |
61c0c23 to
4444d58
Compare
…ts-as-tstz # Conflicts: # spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java # spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
|
@shardulm94 Hey! Could you please take a look over this too :) |
|
@shardulm94 ping :) |
|
@bkahloon are you still interested in this? Just checking to see whether you were going to implement the write support as well? |
@RussellSpitzer yes, I'm still interested. I'll try to get it done within the next two weeks. |
|
@bkahloon ping :) |
Hi @RussellSpitzer , sorry I had been busy with some other obligations. I will work on the write support according to your suggestions. |
|
@bkahloon Hi! Thanks for working on this PR. My team is blocked by the absence of this feature in spark. We would gladly help with write support. We will have to implement it in a fork anyway but would rather join existing effort. So could you share the status? How can we help to sped up the progress? |
|
Hi @daniil-dubin , thank you for following up with me. If you'd like to take over the write support from me that would help me out. I can point you in the direction that Russell had pointed me if you'd like. |
|
Yes, I could do that. Please, share what Russell told you. |
|
Hi @daniil-dubin , sorry for the delay. According to @RussellSpitzer comments, you will need to add an additional case statement handling the timestamp without timezone writing operation. For example for the orc spark writer you can add in an additional statement for without timezone. Russell also mentioned that you should probably add in the warning flag like we did in this PR. If you see this piece of code |
|
Hi, I am working on write support together with @daniil-dubin spark
.read
.format("iceberg")
.option("spark-handle-timestamp-without-timezone", "true")
.load("some.table.name")but we do not have ability to set this flag when using spark sql spark.sql("select * from some.table.name") //<- will fail in current implementationOur proposal is move this flag to spark session property, @bkahloon @RussellSpitzer what do you think about it? |
Sounds good to me, I have no problem with this being a session parameter. Let me know when the changes are up |
|
@RussellSpitzer thank you for the response, changes are ready, but I can't push to the current git branch, so I created another PR to my forked version of iceberg Short description about changes: @RussellSpitzer @bkahloon I would really appreciate for the review |
|
@sshkvar Sorry i was on vacation, feel free to just make a pull request against OSS Iceberg |
|
Added review on other Pull request, Ping me when there is a new copy against Apache/Iceberg @sshkvar |
|
@sshkvar Any update? Let me know when you have a PR up or if you don't have time to handle this just let me know and i'll pick it up |
@RussellSpitzer sorry for the delay, I was fully busy with my project tasks, I will try to create PR to iceberg and address comments this week |
|
@RussellSpitzer I have opened new PR #2757 |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Porting over @shardulm94 's code. Addressed the previous issue of bringing over Linkedin's fork of Iceberg code.
Orig PR from Shardul: https://github.com/linkedin/iceberg/pull/48/files