diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index 1c9d1f1ceba48c..b691771dafb011 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -61,6 +61,7 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d std::to_string(range.table_format_params.paimon_params.last_update_time); params["required_fields"] = join(column_names, ","); params["columns_types"] = join(column_types, "#"); + params["time_zone"] = _state->timezone(); if (range_params->__isset.serialized_table) { params["serialized_table"] = range_params->serialized_table; } diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql new file mode 100644 index 00000000000000..073d26548a0600 --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql @@ -0,0 +1,15 @@ +use paimon; +create database if not exists test_paimon_spark; +use test_paimon_spark; + +SET TIME ZONE '+08:00'; + +CREATE TABLE IF NOT EXISTS t_ts_ntz ( + id INT, + ts TIMESTAMP, + ts_ntz TIMESTAMP_NTZ +) USING paimon; + +INSERT INTO t_ts_ntz VALUES + (1, CAST('2025-08-12 06:00:00+00:00' AS TIMESTAMP), CAST('2025-08-12 06:00:00' AS TIMESTAMP_NTZ)), + (2, CAST('2025-08-12 14:00:00+08:00' AS TIMESTAMP), CAST('2025-08-12 14:00:00' AS TIMESTAMP_NTZ)); \ No newline at end of file diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java index 73aa6ce8550a4b..af8a13149e1001 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java @@ -46,15 +46,17 @@ public class PaimonColumnValue implements ColumnValue { private DataGetters record; private ColumnType dorisType; private DataType dataType; + private String timeZone; public PaimonColumnValue() { } - public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType) { + public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType, String timeZone) { this.idx = idx; this.record = record; this.dorisType = columnType; this.dataType = dataType; + this.timeZone = timeZone; } public void setIdx(int idx, ColumnType dorisType, DataType dataType) { @@ -67,6 +69,10 @@ public void setOffsetRow(InternalRow record) { this.record = record; } + public void setTimeZone(String timeZone) { + this.timeZone = timeZone; + } + @Override public boolean canGetStringAsBytes() { return true; @@ -136,7 +142,8 @@ public LocalDate getDate() { public LocalDateTime getDateTime() { Timestamp ts = record.getTimestamp(idx, dorisType.getPrecision()); if (dataType instanceof LocalZonedTimestampType) { - return LocalDateTime.ofInstant(ts.toInstant(), ZoneId.systemDefault()); + return ts.toLocalDateTime().atZone(ZoneId.of("UTC")) + .withZoneSameInstant(ZoneId.of(timeZone)).toLocalDateTime(); } else { return ts.toLocalDateTime(); } @@ -157,7 +164,7 @@ public void unpackArray(List values) { InternalArray recordArray = record.getArray(idx); for (int i = 0; i < recordArray.size(); i++) { PaimonColumnValue arrayColumnValue = new PaimonColumnValue((DataGetters) recordArray, i, - dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType()); + dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType(), timeZone); values.add(arrayColumnValue); } } @@ -168,13 +175,13 @@ public void unpackMap(List keys, List values) { InternalArray key = map.keyArray(); for (int i = 0; i < key.size(); i++) { PaimonColumnValue keyColumnValue = new PaimonColumnValue((DataGetters) key, i, - dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType()); + dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType(), timeZone); keys.add(keyColumnValue); } InternalArray value = map.valueArray(); for (int i = 0; i < value.size(); i++) { PaimonColumnValue valueColumnValue = new PaimonColumnValue((DataGetters) value, i, - dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType()); + dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType(), timeZone); values.add(valueColumnValue); } } @@ -185,7 +192,7 @@ public void unpackStruct(List structFieldIndex, List value InternalRow row = record.getRow(idx, structFieldIndex.size()); for (int i : structFieldIndex) { values.add(new PaimonColumnValue(row, i, dorisType.getChildTypes().get(i), - ((RowType) dataType).getFields().get(i).type())); + ((RowType) dataType).getFields().get(i).type(), timeZone)); } } } diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index d25d5f7b94f4aa..7565d028ee7a4a 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.TimeZone; import java.util.stream.Collectors; public class PaimonJniScanner extends JniScanner { @@ -58,6 +59,7 @@ public class PaimonJniScanner extends JniScanner { private final PaimonColumnValue columnValue = new PaimonColumnValue(); private List paimonAllFieldNames; private List paimonDataTypeList; + private final String timeZone; private RecordReader.RecordIterator recordIterator = null; private final ClassLoader classLoader; private PreExecutionAuthenticator preExecutionAuthenticator; @@ -76,6 +78,8 @@ public PaimonJniScanner(int batchSize, Map params) { } paimonSplit = params.get("paimon_split"); paimonPredicate = params.get("paimon_predicate"); + this.timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); + columnValue.setTimeZone(timeZone); initTableInfo(columnTypes, requiredFields, batchSize); hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out new file mode 100644 index 00000000000000..6c4acf47ef88a9 --- /dev/null +++ b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_timestamp -- +1 2025-08-12T14:00 2025-08-12T06:00 +2 2025-08-12T14:00 2025-08-12T14:00 + +-- !select_timestamp -- +1 2025-08-12T14:00 2025-08-12T06:00 +2 2025-08-12T14:00 2025-08-12T14:00 + +-- !select_timestamp -- +1 2025-08-12T16:00 2025-08-12T06:00 +2 2025-08-12T16:00 2025-08-12T14:00 + +-- !select_timestamp -- +1 2025-08-12T16:00 2025-08-12T06:00 +2 2025-08-12T16:00 2025-08-12T14:00 + diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy new file mode 100644 index 00000000000000..68a9a06522c8f3 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_timestamp_with_time_zone", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String catalog_name = "test_paimon_timestamp_with_time_zone" + String db_name = "test_paimon_spark" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + + sql """drop catalog if exists ${catalog_name}""" + + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + sql """use `${catalog_name}`.`${db_name}`;""" + + def test_select_timestamp = { + qt_select_timestamp """ select * from t_ts_ntz order by id; """ + } + + try { + sql """ set time_zone = 'Asia/Shanghai'; """ + sql """ set force_jni_scanner = true; """ + test_select_timestamp() + sql """ set force_jni_scanner = false; """ + test_select_timestamp() + sql """ set time_zone = '+10:00'; """ + sql """ set force_jni_scanner = true; """ + test_select_timestamp() + sql """ set force_jni_scanner = false; """ + test_select_timestamp() + } finally { + sql """ unset variable time_zone; """ + sql """ set force_jni_scanner = false; """ + } + } +}