Hive3/Hadoop3 upgrade for iceberg-mr and iceberg-flink - [Draft]#1455
Hive3/Hadoop3 upgrade for iceberg-mr and iceberg-flink - [Draft]#1455marton-bod wants to merge 3 commits intoapache:masterfrom
Conversation
|
Hi @marton-bod! I appreciate your taking an interest in the Iceberg project. I wanted to mention that, as far as I know, Flink does not have any requirements on the hive version. Somebody please correct me if I'm wrong. cc @JingsongLi @openinx Additionally, likely many companies are running on Hive 1.x or 2.x given how long Hive has existed and how short that Hive 3 has existed relatively speaking (and how much shorter support for Hive 3 has existed in these tools). Somebody please correct me if I'm wrong. Additionally, there's currently no EMR release with Flink and Hive 3 and EMR is likely a common place to be running Flink (though that's not where I run it personally, it is one of the simpler options if somebody is on AWS already). Are we sure we want to be dictating the version of Hive that is used by either Hive users or Flink users? Would it be impossible to run iceberg as it exists currently with Hive 3, or would it just utilize potentially deprecated APIs? I think that this is a great start, but I can't help but wonder if this should be discussed on the dev (or even user) mailing list before moving forward. |
| @Override | ||
| public Object copyObject(Object o) { | ||
| return o == null ? null : new Date(((Date) o).getTime()); | ||
| return o == null ? null : Date.valueOf(o.toString()); |
There was a problem hiding this comment.
Are you sure that toString is the appropriate method to call here to get the time from the date?
toString is usually unsafe in these situations, especially for a function that takes in Object. At the least, if an object were passed in previously that could not be casted to java.sql.Date, then the user would get a decent stack trace explaining that to them.
To me, it seems like the stack trace's value to developers has decreased here, but perhaps I'm not understanding the semantics of org.apache.hadoop.hive.common.type.Date.valueOf.
Additionally, I think we tend to prefer to use java.sql.Date as it is the more portable representation across the many query engines that need to be supported. Additionally, I believe that java.sql.Date is expressed in the documentation as the physical type for a logical Date. Again, somebody please correct me if I'm wrong.
There was a problem hiding this comment.
Good catch!
The data object used by Hive internally for storing Date is changed, so instanceOf should be used to handle the differences between the two Date implementations.
There was a problem hiding this comment.
Good point! As @pvary mentioned, with hive3, date and timestamp storage has changed and requires using types from the hive.common package instead of java.sql. I will fix the conversion logic to avoid calling toString.
hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
Show resolved
Hide resolved
@marton-bod has other tasks currently so I try to answer the questions to move forward this process in the meantime. Our internal tests show that the current HiveCatalog/MR implementation does not work with Hive3. That is why the changes in the PR are requested. These are backward compatible changes so in my opinion this is more about adding the possibility to run Iceberg-Hive integration / tests with Hive 3 too, and very-very importantly keeping the Hive 2 capabilities / tests intact.
@marton-bod already started that conversation but it was suggested to create a PR so it is easier to understand the required changes. See: https://lists.apache.org/thread.html/r72e8e7f6d4bdf30e5a460b14ad4a8b0892c15d9426205288c312df16%40%3Cdev.iceberg.apache.org%3E Thanks for taking the time to check the PR! |
| @Override | ||
| public Date getPrimitiveJavaObject(Object o) { | ||
| return o == null ? null : Date.valueOf((LocalDate) o); | ||
| return o == null ? null : Date.valueOf(o.toString()); |
There was a problem hiding this comment.
Date/time values must be converted directly and not via string. Conversions like this will lead to time zone bugs.
There was a problem hiding this comment.
Good point! I will fix the conversion logic to avoid using toString. As for timezones, the expected LocalDate/LocalDateTime objects in these inspectors contain no timezone information or any offsets.
| public DateWritable getPrimitiveWritableObject(Object o) { | ||
| return o == null ? null : new DateWritable(DateTimeUtil.daysFromDate((LocalDate) o)); | ||
| public DateWritableV2 getPrimitiveWritableObject(Object o) { | ||
| return o == null ? null : new DateWritableV2(DateTimeUtil.daysFromDate((LocalDate) o)); |
There was a problem hiding this comment.
Can we control the type returned by the object inspector through a config or by detecting whether DateWritableV2 exists in the classpath? Then we can use the same code for both 2.x and 3.x.
There was a problem hiding this comment.
Unfortunately there was a breaking change in the DateObjectInspector and TimestampObjectInspector interfaces in Hive3. Since we are implementing these interfaces, there has to a be a separate Hive2- and Hive3-compatible version of these implementations. I will open a new PR where the Hive2 and Hive3-specific parts of MR are separated into distinct modules.
| return leaf.getLiteral(); | ||
| case DATE: | ||
| return daysFromTimestamp((Timestamp) leaf.getLiteral()); | ||
| return daysFromDate((Date) leaf.getLiteral()); |
There was a problem hiding this comment.
I think we should check the type returned by getLiteral and handle that here. Then we won't need separate code for different versions.
There was a problem hiding this comment.
Good point, I'll make that change.
|
Thanks for working on this, @marton-bod! It looks to me like we can find ways to share the same code between Hive 2 and 3. That would be ideal. We'll also need to make sure we test both Hive 2 and 3 in CI. |
| compile project(':iceberg-hive-metastore') | ||
| compile project(':iceberg-spark') | ||
| compile project(':iceberg-hive2-metastore') | ||
| compile project(":iceberg-spark") |
There was a problem hiding this comment.
Small nit: why the change from single quotes?
There was a problem hiding this comment.
Yes, it was unintentional. Thanks for the catch
|
Thanks a lot for your reviews! @rdblue @kbendick @pvary |
The goal of this patch is to upgrade certain components to Hive3/Hadoop3 (Hive and Flink), while keeping the option open for other components to stay at Hive2/Hadoop2 (Spark 2/3).
For these latter components, I've created a separate iceberg-hive2-metastore module which they can continue using. I'm not that familiar with gradle, so this part is very much draft-like, with a copy task copying over the source files from the iceberg-hive-metastore module, just using different versions for its dependencies. Any ideas on how to solve this better is greatly appreciated.
(note: only been tested with java8, so will need to look into any test failures on java 11)