-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[MINOR] Fix logical type issue for timestamp columns #17601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-0.x
Are you sure you want to change the base?
[MINOR] Fix logical type issue for timestamp columns #17601
Conversation
ac2916a to
5ef5773
Compare
0c4e026 to
79c4a88
Compare
8583da1 to
0c7b7b9
Compare
fcbe23c to
20ada07
Compare
lokeshj1703
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@linliu-code Thanks for working on this! The PR contains a few changes which are not part of https://github.com/apache/hudi/pull/14161/files. Can we add description about how the fix works for older hudi tables. Also the original PR mentions a limitation.
However, we used the InternalSchema system to do various operations such as fix null ordering, reorder, and add columns. At the time, InternalSchema only had a single Timestamp type. When converting back to avro, this was assumed to be micros.
Is this limitation fixed in older hudi tables?
hudi-common/src/avro/test/java/org/apache/parquet/schema/TestAvroSchemaRepair.java
Show resolved
Hide resolved
| public static boolean hasTimestampMillisField(Schema tableSchema) { | ||
| switch (tableSchema.getType()) { | ||
| case RECORD: | ||
| for (Schema.Field field : tableSchema.getFields()) { | ||
| if (hasTimestampMillisField(field.schema())) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
|
|
||
| case ARRAY: | ||
| return hasTimestampMillisField(tableSchema.getElementType()); | ||
|
|
||
| case MAP: | ||
| return hasTimestampMillisField(tableSchema.getValueType()); | ||
|
|
||
| case UNION: | ||
| return hasTimestampMillisField(AvroSchemaUtils.getNonNullTypeFromUnion(tableSchema)); | ||
|
|
||
| default: | ||
| return tableSchema.getType() == Schema.Type.LONG | ||
| && (tableSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check if LogicalTypes.LocalTimestampMillis is supported in the current Avro version | ||
| * | ||
| * @return true if LocalTimestampMillis is available, false otherwise | ||
| */ | ||
| public static boolean isLocalTimestampMillisSupported() { | ||
| try { | ||
| return Arrays.stream(LogicalTypes.class.getDeclaredClasses()) | ||
| .anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis")); | ||
| } catch (Exception e) { | ||
| return false; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like these APIs are not used. Should we remove these?
| public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldName) { | ||
| if (StringUtils.isNullOrEmpty(fieldName)) { | ||
| return Option.empty(); | ||
| } | ||
| String[] parts = fieldName.split("\\."); | ||
| for (String part : parts) { | ||
| Schema.Field foundField = getNonNullTypeFromUnion(schema).getField(part); | ||
| if (foundField == null) { | ||
| throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); | ||
| } | ||
| schema = foundField.schema(); | ||
| } | ||
| return Option.of(getNonNullTypeFromUnion(schema)); | ||
| } | ||
|
|
||
| public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) { | ||
| return findNestedFieldSchema(schema, fieldName).map(Schema::getType); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These APIs are not used anywhere.
| // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2) | ||
| // Only add conversions if they're available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate the fix and added tests with spark 2? I am not sure if CI covers it by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we only make the conversion for Spark3.4+.
| return Instant.ofEpochSecond(epochSeconds, nanoAdjustment); | ||
| } | ||
|
|
||
| public static Instant nanosToInstant(long nanosFromEpoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are unused
8c011e0 to
ac33414
Compare
…4161) Co-authored-by: Jonathan Vexler <=> Co-authored-by: sivabalan <n.siva.b@gmail.com> Co-authored-by: Vamsi <vamsi@onehouse.ai> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com> Co-authored-by: Lin Liu <linliu.code@gmail.com>
…oKeyGenerator (apache#7913) Co-authored-by: Sydney Beal <sydney_beal@comcast.com> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
9f21467 to
6e44877
Compare
844a712 to
5516cff
Compare
5516cff to
fcc5cda
Compare
52b2658 to
723a3a3
Compare
CI report:
Bot commands@hudi-bot supports the following commands:
|
Change Logs
This pr #9743 adds more schema evolution functionality and schema processing. However, we used the InternalSchema system to do various operations such as fix null ordering, reorder, and add columns. At the time, InternalSchema only had a single Timestamp type. When converting back to avro, this was assumed to be micros. Therefore, if the schema provider had any millis columns, the processed schema would end up with those columns as micros.
In this pr to update column stats with better support for logical types: #13711, the schema issues were fixed, as well as additional issues with handling and conversion of timestamps during ingestion.
this pr aims to add functionality to spark and hive readers and writers to automatically repair affected tables.
After switching to use the 1.1 binary, the affected columns will undergo evolution from timestamp-micros to timestamp-mills. Normally a lossy evolution that is not supported, this evolution is ok because the data is actually still timestamp-millis it is just mislabeled as micros in the parquet and table schemas
Impact
When reading from a hudi table using spark or hive reader if the table schema has a column as millis, but the data schema is micros, we will assume that this column is affected and read it as a millis value instead of a micros value. This correction is also applied to all readers that the default write paths use. As a table is rewritten the parquet files will be correct. A table's latest snapshot can be immediately fixed by writing one commit with the 1.1 binary, and then clustering the entire table.
Risk level (write none, low medium or high below)
High,
extensive testing was done and functional tests were added.
Documentation Update
#14100
Contributor's checklist