Skip to content

Commit

Permalink
[bugfix](paimon)Get the file format by file name (#41020) (#41487)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi authored Sep 30, 2024
1 parent 78b95e7 commit 4f81fc4
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use paimon;
create database if not exists test_paimon_spark;
use test_paimon_spark;

drop table if exists test_tb_mix_format;
create table test_tb_mix_format (
id int,
value int,
par string
) PARTITIONED BY (par) TBLPROPERTIES (
'primary-key' = 'id, par',
'bucket'=1000,
'file.format'='orc'
);
-- orc format in partition a
insert into test_tb_mix_format values (1,1,'a'),(2,1,'a'),(3,1,'a'),(4,1,'a'),(5,1,'a'),(6,1,'a'),(7,1,'a'),(8,1,'a'),(9,1,'a'),(10,1,'a');
-- update some data, these splits will be readed by jni
insert into test_tb_mix_format values (1,2,'a'),(2,2,'a'),(3,2,'a'),(4,2,'a'),(5,2,'a');
-- parquet format in partition b
alter table test_tb_mix_format set TBLPROPERTIES ('file.format'='parquet');
insert into test_tb_mix_format values (1,1,'b'),(2,1,'b'),(3,1,'b'),(4,1,'b'),(5,1,'b'),(6,1,'b'),(7,1,'b'),(8,1,'b'),(9,1,'b'),(10,1,'b');
-- update some data, these splits will be readed by jni
insert into test_tb_mix_format values (1,2,'b'),(2,2,'b'),(3,2,'b'),(4,2,'b'),(5,2,'b');
-- delete foramt in table properties, doris should get format by file name
alter table test_tb_mix_format unset TBLPROPERTIES ('file.format');
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Strings;

import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;

public class FileFormatUtils {
Expand Down Expand Up @@ -105,4 +106,18 @@ public static void parseCsvSchema(List<Column> csvSchema, String csvSchemaStr)
throw new AnalysisException("invalid csv schema: " + e.getMessage());
}
}

public static Optional<String> getFileFormatBySuffix(String filename) {
String fileString = filename.toLowerCase();
if (fileString.endsWith(".avro")) {
return Optional.of("avro");
} else if (fileString.endsWith(".orc")) {
return Optional.of("orc");
} else if (fileString.endsWith(".parquet")) {
return Optional.of("parquet");
} else {
// Unable to get file format from file path
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
Expand Down Expand Up @@ -153,7 +154,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit)
// use jni reader
fileDesc.setPaimonSplit(encodeObjectToString(split));
}
fileDesc.setFileFormat(source.getFileFormat());
fileDesc.setFileFormat(getFileFormat(paimonSplit.getPathString()));
fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName())
.collect(Collectors.joining(",")));
Expand Down Expand Up @@ -190,19 +191,18 @@ public List<Split> getSplits() throws UserException {
List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.withFilter(predicates)
.withProjection(projected)
.newScan().plan().splits();
boolean supportNative = supportNativeReader();
// Just for counting the number of selected partitions for this paimon table
Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
for (org.apache.paimon.table.source.Split split : paimonSplits) {
SplitStat splitStat = new SplitStat();
splitStat.setRowCount(split.rowCount());
if (!forceJniScanner && supportNative && split instanceof DataSplit) {
if (!forceJniScanner && split instanceof DataSplit) {
DataSplit dataSplit = (DataSplit) split;
BinaryRow partitionValue = dataSplit.partition();
selectedPartitionValues.add(partitionValue);
Optional<List<RawFile>> optRawFiles = dataSplit.convertToRawFiles();
Optional<List<DeletionFile>> optDeletionFiles = dataSplit.deletionFiles();
if (optRawFiles.isPresent()) {
if (supportNativeReader(optRawFiles)) {
splitStat.setType(SplitReadType.NATIVE);
splitStat.setRawFileConvertable(true);
List<RawFile> rawFiles = optRawFiles.get();
Expand Down Expand Up @@ -272,15 +272,22 @@ public List<Split> getSplits() throws UserException {
return splits;
}

private boolean supportNativeReader() {
String fileFormat = source.getFileFormat().toLowerCase();
switch (fileFormat) {
case "orc":
case "parquet":
return true;
default:
private String getFileFormat(String path) {
return FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties());
}

private boolean supportNativeReader(Optional<List<RawFile>> optRawFiles) {
if (!optRawFiles.isPresent()) {
return false;
}
List<String> files = optRawFiles.get().stream().map(RawFile::path).collect(Collectors.toList());
for (String f : files) {
String splitFileFormat = getFileFormat(f);
if (!splitFileFormat.equals("orc") && !splitFileFormat.equals("parquet")) {
return false;
}
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ExternalCatalog getCatalog() {
return paimonExtTable.getCatalog();
}

public String getFileFormat() {
return originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc");
public String getFileFormatFromTableProperties() {
return originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "parquet");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !order --
1 2 a
2 2 a
3 2 a
4 2 a
5 2 a
6 1 a
7 1 a
8 1 a
9 1 a
10 1 a
1 2 b
2 2 b
3 2 b
4 2 b
5 2 b
6 1 b
7 1 b
8 1 b
9 1 b
10 1 b

-- !order --
1 2 a
2 2 a
3 2 a
4 2 a
5 2 a
6 1 a
7 1 a
8 1 a
9 1 a
10 1 a
1 2 b
2 2 b
3 2 b
4 2 b
5 2 b
6 1 b
7 1 b
8 1 b
9 1 b
10 1 b

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("paimon_tb_mix_format", "p0,external,doris,external_docker,external_docker_doris") {

logger.info("start paimon test")
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disabled paimon test")
return
}

try {
String catalog_name = "paimon_tb_mix_format"
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")

sql """drop catalog if exists ${catalog_name}"""
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='paimon',
'warehouse' = 's3://warehouse/wh/',
"s3.access_key" = "admin",
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
"s3.region" = "us-east-1"
);"""

logger.info("catalog " + catalog_name + " created")
sql """switch ${catalog_name};"""
logger.info("switched to catalog " + catalog_name)
sql """use test_paimon_spark;"""

sql """set force_jni_scanner=true"""
qt_order """ select * from test_tb_mix_format order by par,id; """

sql """set force_jni_scanner=false"""
qt_order """ select * from test_tb_mix_format order by par,id; """
} finally {
sql """set force_jni_scanner=false"""
}

}

0 comments on commit 4f81fc4

Please sign in to comment.