diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run06.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run06.sql new file mode 100644 index 00000000000000..7aa4170eab0985 --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/run06.sql @@ -0,0 +1,25 @@ +use paimon; +create database if not exists test_paimon_spark; +use test_paimon_spark; + +drop table if exists test_tb_mix_format; +create table test_tb_mix_format ( + id int, + value int, + par string +) PARTITIONED BY (par) TBLPROPERTIES ( + 'primary-key' = 'id, par', + 'bucket'=1000, + 'file.format'='orc' +); +-- orc format in partition a +insert into test_tb_mix_format values (1,1,'a'),(2,1,'a'),(3,1,'a'),(4,1,'a'),(5,1,'a'),(6,1,'a'),(7,1,'a'),(8,1,'a'),(9,1,'a'),(10,1,'a'); +-- update some data, these splits will be readed by jni +insert into test_tb_mix_format values (1,2,'a'),(2,2,'a'),(3,2,'a'),(4,2,'a'),(5,2,'a'); +-- parquet format in partition b +alter table test_tb_mix_format set TBLPROPERTIES ('file.format'='parquet'); +insert into test_tb_mix_format values (1,1,'b'),(2,1,'b'),(3,1,'b'),(4,1,'b'),(5,1,'b'),(6,1,'b'),(7,1,'b'),(8,1,'b'),(9,1,'b'),(10,1,'b'); +-- update some data, these splits will be readed by jni +insert into test_tb_mix_format values (1,2,'b'),(2,2,'b'),(3,2,'b'),(4,2,'b'),(5,2,'b'); +-- delete foramt in table properties, doris should get format by file name +alter table test_tb_mix_format unset TBLPROPERTIES ('file.format'); \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java index 0b646a00b164d1..15240f103b0e51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java @@ -26,6 +26,7 @@ import com.google.common.base.Strings; import java.util.List; +import java.util.Optional; import java.util.regex.Matcher; public class FileFormatUtils { @@ -105,4 +106,18 @@ public static void parseCsvSchema(List csvSchema, String csvSchemaStr) throw new AnalysisException("invalid csv schema: " + e.getMessage()); } } + + public static Optional getFileFormatBySuffix(String filename) { + String fileString = filename.toLowerCase(); + if (fileString.endsWith(".avro")) { + return Optional.of("avro"); + } else if (fileString.endsWith(".orc")) { + return Optional.of("orc"); + } else if (fileString.endsWith(".parquet")) { + return Optional.of("parquet"); + } else { + // Unable to get file format from file path + return Optional.empty(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index ad8017a4f4c7d7..d25fa2b94be118 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -23,6 +23,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; @@ -153,7 +154,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) // use jni reader fileDesc.setPaimonSplit(encodeObjectToString(split)); } - fileDesc.setFileFormat(source.getFileFormat()); + fileDesc.setFileFormat(getFileFormat(paimonSplit.getPathString())); fileDesc.setPaimonPredicate(encodeObjectToString(predicates)); fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName()) .collect(Collectors.joining(","))); @@ -190,19 +191,18 @@ public List getSplits() throws UserException { List paimonSplits = readBuilder.withFilter(predicates) .withProjection(projected) .newScan().plan().splits(); - boolean supportNative = supportNativeReader(); // Just for counting the number of selected partitions for this paimon table Set selectedPartitionValues = Sets.newHashSet(); for (org.apache.paimon.table.source.Split split : paimonSplits) { SplitStat splitStat = new SplitStat(); splitStat.setRowCount(split.rowCount()); - if (!forceJniScanner && supportNative && split instanceof DataSplit) { + if (!forceJniScanner && split instanceof DataSplit) { DataSplit dataSplit = (DataSplit) split; BinaryRow partitionValue = dataSplit.partition(); selectedPartitionValues.add(partitionValue); Optional> optRawFiles = dataSplit.convertToRawFiles(); Optional> optDeletionFiles = dataSplit.deletionFiles(); - if (optRawFiles.isPresent()) { + if (supportNativeReader(optRawFiles)) { splitStat.setType(SplitReadType.NATIVE); splitStat.setRawFileConvertable(true); List rawFiles = optRawFiles.get(); @@ -272,15 +272,22 @@ public List getSplits() throws UserException { return splits; } - private boolean supportNativeReader() { - String fileFormat = source.getFileFormat().toLowerCase(); - switch (fileFormat) { - case "orc": - case "parquet": - return true; - default: + private String getFileFormat(String path) { + return FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties()); + } + + private boolean supportNativeReader(Optional> optRawFiles) { + if (!optRawFiles.isPresent()) { + return false; + } + List files = optRawFiles.get().stream().map(RawFile::path).collect(Collectors.toList()); + for (String f : files) { + String splitFileFormat = getFileFormat(f); + if (!splitFileFormat.equals("orc") && !splitFileFormat.equals("parquet")) { return false; + } } + return true; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 44063e3226b5eb..eee9e6bad08b36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -62,7 +62,7 @@ public ExternalCatalog getCatalog() { return paimonExtTable.getCatalog(); } - public String getFileFormat() { - return originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc"); + public String getFileFormatFromTableProperties() { + return originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "parquet"); } } diff --git a/regression-test/data/external_table_p0/paimon/paimon_tb_mix_format.out b/regression-test/data/external_table_p0/paimon/paimon_tb_mix_format.out new file mode 100644 index 00000000000000..dca960f8c8785b --- /dev/null +++ b/regression-test/data/external_table_p0/paimon/paimon_tb_mix_format.out @@ -0,0 +1,45 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !order -- +1 2 a +2 2 a +3 2 a +4 2 a +5 2 a +6 1 a +7 1 a +8 1 a +9 1 a +10 1 a +1 2 b +2 2 b +3 2 b +4 2 b +5 2 b +6 1 b +7 1 b +8 1 b +9 1 b +10 1 b + +-- !order -- +1 2 a +2 2 a +3 2 a +4 2 a +5 2 a +6 1 a +7 1 a +8 1 a +9 1 a +10 1 a +1 2 b +2 2 b +3 2 b +4 2 b +5 2 b +6 1 b +7 1 b +8 1 b +9 1 b +10 1 b + diff --git a/regression-test/suites/external_table_p0/paimon/paimon_tb_mix_format.groovy b/regression-test/suites/external_table_p0/paimon/paimon_tb_mix_format.groovy new file mode 100644 index 00000000000000..c8569f5c03fa68 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/paimon_tb_mix_format.groovy @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("paimon_tb_mix_format", "p0,external,doris,external_docker,external_docker_doris") { + + logger.info("start paimon test") + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + + try { + String catalog_name = "paimon_tb_mix_format" + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """use test_paimon_spark;""" + + sql """set force_jni_scanner=true""" + qt_order """ select * from test_tb_mix_format order by par,id; """ + + sql """set force_jni_scanner=false""" + qt_order """ select * from test_tb_mix_format order by par,id; """ + } finally { + sql """set force_jni_scanner=false""" + } + +} \ No newline at end of file