From 972da412b3b2bbe0915bfc48fe9cb983bd452759 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Mon, 25 Aug 2025 18:30:55 +0800 Subject: [PATCH] Kudu: add kudu.timezone for TIMESTAMP conversion --- docs/src/main/sphinx/connector/kudu.md | 3 ++ .../trino/plugin/kudu/KuduClientConfig.java | 14 +++++++ .../io/trino/plugin/kudu/KuduPageSink.java | 2 +- .../io/trino/plugin/kudu/TimestampHelper.java | 42 +++++++++++++++++++ .../java/io/trino/plugin/kudu/TypeHelper.java | 4 +- 5 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TimestampHelper.java diff --git a/docs/src/main/sphinx/connector/kudu.md b/docs/src/main/sphinx/connector/kudu.md index 622525c3846..2c5b9c812b5 100644 --- a/docs/src/main/sphinx/connector/kudu.md +++ b/docs/src/main/sphinx/connector/kudu.md @@ -56,6 +56,9 @@ kudu.client.master-addresses=localhost ## Assign Kudu splits to replica host if worker and kudu share the same cluster #kudu.allow-local-scheduling = false + +## Optional timezone used for TIMESTAMP values (default: UTC) +#kudu.timezone = UTC ``` ## Kerberos support diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java index 6a0a281ec8c..43964725d8d 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java @@ -44,6 +44,7 @@ public class KuduClientConfig private String schemaEmulationPrefix = "presto::"; private Duration dynamicFilteringWaitTimeout = new Duration(0, MINUTES); private boolean allowLocalScheduling; + private String timeZone = "UTC"; @NotNull @Size(min = 1) @@ -138,6 +139,19 @@ public KuduClientConfig setDynamicFilteringWaitTimeout(Duration dynamicFiltering return this; } + @Config("kudu.timezone") + public KuduClientConfig setTimeZone(String timeZone) + { + this.timeZone = timeZone; + TimestampHelper.setTimeZoneOffset(TimestampHelper.getZoneOffset(timeZone)); + return this; + } + + public String getTimeZone() + { + return timeZone; + } + public boolean isAllowLocalScheduling() { return allowLocalScheduling; diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java index 8ce926a56a0..d14297c22b8 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java @@ -157,7 +157,7 @@ else if (DATE.equals(type)) { row.addDate(destChannel, epochDaysToSqlDate(INTEGER.getInt(block, position))); } else if (TIMESTAMP_MILLIS.equals(type)) { - row.addLong(destChannel, truncateEpochMicrosToMillis(TIMESTAMP_MILLIS.getLong(block, position))); + row.addLong(destChannel, truncateEpochMicrosToMillis(TIMESTAMP_MILLIS.getLong(block, position)) - (TimestampHelper.getTimeZoneOffset() * 1_000)); } else if (REAL.equals(type)) { row.addFloat(destChannel, REAL.getFloat(block, position)); diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TimestampHelper.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TimestampHelper.java new file mode 100644 index 00000000000..3b368d5d4b0 --- /dev/null +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TimestampHelper.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kudu; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; + +public final class TimestampHelper +{ + private TimestampHelper() {} + + private static long timeZoneOffsetMillis; + + public static long getTimeZoneOffset() + { + return timeZoneOffsetMillis; + } + + public static void setTimeZoneOffset(long timeZoneOffsetMillis) + { + TimestampHelper.timeZoneOffsetMillis = timeZoneOffsetMillis; + } + + public static long getZoneOffset(String zone) + { + ZoneId zoneId = ZoneId.of(zone); + ZoneOffset offset = zoneId.getRules().getOffset(Instant.now()); + return offset.getTotalSeconds() * 1000L; + } +} diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java index 3c294029659..86084dc68ff 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java @@ -173,7 +173,7 @@ public static Object getJavaValue(Type type, Object nativeValue) } if (type.equals(TIMESTAMP_MILLIS)) { // Kudu's native format is in microseconds - return nativeValue; + return ((Long) nativeValue) - (TimestampHelper.getTimeZoneOffset() * 1_000); } throw new IllegalStateException("Back conversion not implemented for " + type); } @@ -215,7 +215,7 @@ public static long getLong(Type type, RowResult row, int field) return row.getInt(field); } if (type.equals(TIMESTAMP_MILLIS)) { - return truncateEpochMicrosToMillis(row.getLong(field)); + return truncateEpochMicrosToMillis(row.getLong(field)) + (TimestampHelper.getTimeZoneOffset() * 1_000); } throw new IllegalStateException("getLong not implemented for " + type); }