diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index e902db8bc42d0a..e79a673e894a54 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -21,6 +21,7 @@ #include #include "runtime/descriptors.h" +#include "runtime/runtime_state.h" #include "runtime/types.h" #include "vec/core/types.h" @@ -62,6 +63,7 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d std::to_string(range.table_format_params.paimon_params.last_update_time); params["required_fields"] = join(column_names, ","); params["columns_types"] = join(column_types, "#"); + params["time_zone"] = _state->timezone(); if (range_params->__isset.serialized_table) { params["serialized_table"] = range_params->serialized_table; } diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl index 9ba8987d02bf97..73de5e9b05f094 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl @@ -35,8 +35,8 @@ services: - ./data:/mnt/data - ./scripts:/mnt/scripts - ./spark-defaults.conf:/opt/spark/conf/spark-defaults.conf - - ./data/input/jars/paimon-spark-3.5-0.8.0.jar:/opt/spark/jars/paimon-spark-3.5-0.8.0.jar - - ./data/input/jars/paimon-s3-0.8.0.jar:/opt/spark/jars/paimon-s3-0.8.0.jar + - ./data/input/jars/paimon-spark-3.5-1.0.1.jar:/opt/spark/jars/paimon-spark-3.5-1.0.1.jar + - ./data/input/jars/paimon-s3-1.0.1.jar:/opt/spark/jars/paimon-s3-1.0.1.jar environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql new file mode 100644 index 00000000000000..073d26548a0600 --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql @@ -0,0 +1,15 @@ +use paimon; +create database if not exists test_paimon_spark; +use test_paimon_spark; + +SET TIME ZONE '+08:00'; + +CREATE TABLE IF NOT EXISTS t_ts_ntz ( + id INT, + ts TIMESTAMP, + ts_ntz TIMESTAMP_NTZ +) USING paimon; + +INSERT INTO t_ts_ntz VALUES + (1, CAST('2025-08-12 06:00:00+00:00' AS TIMESTAMP), CAST('2025-08-12 06:00:00' AS TIMESTAMP_NTZ)), + (2, CAST('2025-08-12 14:00:00+08:00' AS TIMESTAMP), CAST('2025-08-12 14:00:00' AS TIMESTAMP_NTZ)); \ No newline at end of file diff --git a/docker/thirdparties/run-thirdparties-docker.sh b/docker/thirdparties/run-thirdparties-docker.sh index 26580301dd941f..edfaf306c72106 100755 --- a/docker/thirdparties/run-thirdparties-docker.sh +++ b/docker/thirdparties/run-thirdparties-docker.sh @@ -460,11 +460,11 @@ start_iceberg() { if [[ ! -d "${ICEBERG_DIR}/data" ]]; then echo "${ICEBERG_DIR}/data does not exist" cd "${ICEBERG_DIR}" \ - && rm -f iceberg_data.zip \ - && wget -P "${ROOT}"/docker-compose/iceberg https://"${s3BucketName}.${s3Endpoint}"/regression/datalake/pipeline_data/iceberg_data.zip \ - && sudo unzip iceberg_data.zip \ + && rm -f iceberg_data*.zip \ + && wget -P "${ROOT}"/docker-compose/iceberg https://"${s3BucketName}.${s3Endpoint}"/regression/datalake/pipeline_data/iceberg_data_paimon_101.zip \ + && sudo unzip iceberg_data_paimon_101.zip \ && sudo mv iceberg_data data \ - && sudo rm -rf iceberg_data.zip + && sudo rm -rf iceberg_data_paimon_101.zip cd - else echo "${ICEBERG_DIR}/data exist, continue !" diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java index 73aa6ce8550a4b..af8a13149e1001 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java @@ -46,15 +46,17 @@ public class PaimonColumnValue implements ColumnValue { private DataGetters record; private ColumnType dorisType; private DataType dataType; + private String timeZone; public PaimonColumnValue() { } - public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType) { + public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType, String timeZone) { this.idx = idx; this.record = record; this.dorisType = columnType; this.dataType = dataType; + this.timeZone = timeZone; } public void setIdx(int idx, ColumnType dorisType, DataType dataType) { @@ -67,6 +69,10 @@ public void setOffsetRow(InternalRow record) { this.record = record; } + public void setTimeZone(String timeZone) { + this.timeZone = timeZone; + } + @Override public boolean canGetStringAsBytes() { return true; @@ -136,7 +142,8 @@ public LocalDate getDate() { public LocalDateTime getDateTime() { Timestamp ts = record.getTimestamp(idx, dorisType.getPrecision()); if (dataType instanceof LocalZonedTimestampType) { - return LocalDateTime.ofInstant(ts.toInstant(), ZoneId.systemDefault()); + return ts.toLocalDateTime().atZone(ZoneId.of("UTC")) + .withZoneSameInstant(ZoneId.of(timeZone)).toLocalDateTime(); } else { return ts.toLocalDateTime(); } @@ -157,7 +164,7 @@ public void unpackArray(List values) { InternalArray recordArray = record.getArray(idx); for (int i = 0; i < recordArray.size(); i++) { PaimonColumnValue arrayColumnValue = new PaimonColumnValue((DataGetters) recordArray, i, - dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType()); + dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType(), timeZone); values.add(arrayColumnValue); } } @@ -168,13 +175,13 @@ public void unpackMap(List keys, List values) { InternalArray key = map.keyArray(); for (int i = 0; i < key.size(); i++) { PaimonColumnValue keyColumnValue = new PaimonColumnValue((DataGetters) key, i, - dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType()); + dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType(), timeZone); keys.add(keyColumnValue); } InternalArray value = map.valueArray(); for (int i = 0; i < value.size(); i++) { PaimonColumnValue valueColumnValue = new PaimonColumnValue((DataGetters) value, i, - dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType()); + dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType(), timeZone); values.add(valueColumnValue); } } @@ -185,7 +192,7 @@ public void unpackStruct(List structFieldIndex, List value InternalRow row = record.getRow(idx, structFieldIndex.size()); for (int i : structFieldIndex) { values.add(new PaimonColumnValue(row, i, dorisType.getChildTypes().get(i), - ((RowType) dataType).getFields().get(i).type())); + ((RowType) dataType).getFields().get(i).type(), timeZone)); } } } diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index e6c04a0a2f72b8..130b2224c5b841 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.TimeZone; import java.util.stream.Collectors; public class PaimonJniScanner extends JniScanner { @@ -74,6 +75,7 @@ public class PaimonJniScanner extends JniScanner { private long tblId; @Deprecated private long lastUpdateTime; + private final String timeZone; private RecordReader.RecordIterator recordIterator = null; private final ClassLoader classLoader; private PreExecutionAuthenticator preExecutionAuthenticator; @@ -98,6 +100,8 @@ public PaimonJniScanner(int batchSize, Map params) { dbId = Long.parseLong(params.get("db_id")); tblId = Long.parseLong(params.get("tbl_id")); lastUpdateTime = Long.parseLong(params.get("last_update_time")); + this.timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); + columnValue.setTimeZone(timeZone); initTableInfo(columnTypes, requiredFields, batchSize); paimonOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out new file mode 100644 index 00000000000000..6c4acf47ef88a9 --- /dev/null +++ b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_timestamp -- +1 2025-08-12T14:00 2025-08-12T06:00 +2 2025-08-12T14:00 2025-08-12T14:00 + +-- !select_timestamp -- +1 2025-08-12T14:00 2025-08-12T06:00 +2 2025-08-12T14:00 2025-08-12T14:00 + +-- !select_timestamp -- +1 2025-08-12T16:00 2025-08-12T06:00 +2 2025-08-12T16:00 2025-08-12T14:00 + +-- !select_timestamp -- +1 2025-08-12T16:00 2025-08-12T06:00 +2 2025-08-12T16:00 2025-08-12T14:00 + diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy new file mode 100644 index 00000000000000..68a9a06522c8f3 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_timestamp_with_time_zone", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String catalog_name = "test_paimon_timestamp_with_time_zone" + String db_name = "test_paimon_spark" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + + sql """drop catalog if exists ${catalog_name}""" + + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + sql """use `${catalog_name}`.`${db_name}`;""" + + def test_select_timestamp = { + qt_select_timestamp """ select * from t_ts_ntz order by id; """ + } + + try { + sql """ set time_zone = 'Asia/Shanghai'; """ + sql """ set force_jni_scanner = true; """ + test_select_timestamp() + sql """ set force_jni_scanner = false; """ + test_select_timestamp() + sql """ set time_zone = '+10:00'; """ + sql """ set force_jni_scanner = true; """ + test_select_timestamp() + sql """ set force_jni_scanner = false; """ + test_select_timestamp() + } finally { + sql """ unset variable time_zone; """ + sql """ set force_jni_scanner = false; """ + } + } +}