From fa743fc0153a36437bb6bc24d93e8acdce750723 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 13 Aug 2025 17:43:50 +0800 Subject: [PATCH 01/13] clean code --- be/src/vec/exec/format/table/paimon_jni_reader.cpp | 8 -------- .../java/org/apache/doris/paimon/PaimonJniScanner.java | 4 ---- .../doris/datasource/paimon/source/PaimonScanNode.java | 10 ---------- 3 files changed, 22 deletions(-) diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index b691771dafb011..0ffbe79a94a64e 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -49,16 +49,8 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d column_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type())); } std::map params; - params["db_name"] = range.table_format_params.paimon_params.db_name; - params["table_name"] = range.table_format_params.paimon_params.table_name; params["paimon_split"] = range.table_format_params.paimon_params.paimon_split; - params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names; params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate; - params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id); - params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id); - params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id); - params["last_update_time"] = - std::to_string(range.table_format_params.paimon_params.last_update_time); params["required_fields"] = join(column_names, ","); params["columns_types"] = join(column_types, "#"); params["time_zone"] = _state->timezone(); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 7565d028ee7a4a..002ceac3537495 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -44,13 +44,9 @@ public class PaimonJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class); - @Deprecated - private static final String PAIMON_OPTION_PREFIX = "paimon."; - @Deprecated private static final String HADOOP_OPTION_PREFIX = "hadoop."; private final Map params; - @Deprecated private final Map hadoopOptionParams; private final String paimonSplit; private final String paimonPredicate; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 2a3df6a28776ab..a385d408b390dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -83,7 +83,6 @@ public class PaimonScanNode extends FileQueryScanNode { private static final String DORIS_START_TIMESTAMP = "startTimestamp"; private static final String DORIS_END_TIMESTAMP = "endTimestamp"; private static final String DORIS_INCREMENTAL_BETWEEN_SCAN_MODE = "incrementalBetweenScanMode"; - private static final String DEFAULT_INCREMENTAL_BETWEEN_SCAN_MODE = "auto"; private enum SplitReadType { JNI, @@ -213,15 +212,6 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) } fileDesc.setFileFormat(fileFormat); fileDesc.setPaimonPredicate(PaimonUtil.encodeObjectToString(predicates)); - fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName()) - .collect(Collectors.joining(","))); - fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName()); - fileDesc.setPaimonOptions(((PaimonExternalCatalog) source.getCatalog()).getPaimonOptionsMap()); - fileDesc.setTableName(source.getTargetTable().getName()); - fileDesc.setCtlId(source.getCatalog().getId()); - fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); - fileDesc.setTblId(source.getTargetTable().getId()); - fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); // The hadoop conf should be same with // PaimonExternalCatalog.createCatalog()#getConfiguration() fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getBackendStorageProperties()); From b34785b6f76784e94307798f8da7e7e3ee9dd39c Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 11:10:06 +0800 Subject: [PATCH 02/13] change thrift --- gensrc/thrift/PlanNodes.thrift | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 3a10248b9fa005..7faf8683b66262 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -538,21 +538,23 @@ struct TDataGenScanRange { struct TIcebergMetadataParams { - 1: optional string serialized_task + 1: optional string serialized_task // deprecated 2: optional map hadoop_props + 3: optional string serialized_table } struct TPaimonMetadataParams { - 1: optional string db_name - 2: optional string tbl_name - 3: optional string query_type - 4: optional i64 ctl_id - 5: optional i64 db_id - 6: optional i64 tbl_id - 7: optional string serialized_split + 1: optional string db_name // deprecated + 2: optional string tbl_name // deprecated + 3: optional string query_type // deprecated + 4: optional i64 ctl_id // deprecated + 5: optional i64 db_id // deprecated + 6: optional i64 tbl_id // deprecated + 7: optional string serialized_split // deprecated 8: optional map hadoop_props - 9: optional map paimon_props + 9: optional map paimon_props // deprecated + 10: optional string serialized_table } struct THudiMetadataParams { @@ -624,6 +626,9 @@ struct TMetaScanRange { 11: optional TPartitionValuesMetadataParams partition_values_params 12: optional THudiMetadataParams hudi_params 13: optional TPaimonMetadataParams paimon_params + + // ... more params + 999: optional list splits } // Specification of an individual data range which is held in its entirety From f4e2db19186b0cf60ac031f4c17c7352ce6ac3ec Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 15:13:58 +0800 Subject: [PATCH 03/13] remove PaimonTableCache --- .../table/paimon_sys_table_jni_reader.cpp | 4 - .../paimon/PaimonSysTableJniScanner.java | 38 +-- .../apache/doris/paimon/PaimonTableCache.java | 221 ------------------ 3 files changed, 5 insertions(+), 258 deletions(-) delete mode 100644 fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp index e6b0263ed25053..147ebfba179e26 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp @@ -24,7 +24,6 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop."; -const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon."; PaimonSysTableJniReader::PaimonSysTableJniReader( const std::vector& file_slot_descs, RuntimeState* state, @@ -41,9 +40,6 @@ PaimonSysTableJniReader::PaimonSysTableJniReader( params["db_name"] = _range_params.db_name; params["tbl_name"] = _range_params.tbl_name; params["query_type"] = _range_params.query_type; - params["ctl_id"] = std::to_string(_range_params.ctl_id); - params["db_id"] = std::to_string(_range_params.db_id); - params["tbl_id"] = std::to_string(_range_params.tbl_id); params["serialized_split"] = _range_params.serialized_split; params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java index 0344c8b2db4194..3a708690b4c19e 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java @@ -21,8 +21,6 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; -import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; -import org.apache.doris.paimon.PaimonTableCache.TableExt; import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; @@ -63,12 +61,6 @@ public class PaimonSysTableJniScanner extends JniScanner { private List paimonAllFieldNames; private final PreExecutionAuthenticator preExecutionAuthenticator; private RecordReader.RecordIterator recordIterator = null; - private final long ctlId; - private final long dbId; - private final long tblId; - private final String dbName; - private final String tblName; - private final String queryType; /** * Constructs a new PaimonSysTableJniScanner for reading Paimon system tables. @@ -86,13 +78,12 @@ public PaimonSysTableJniScanner(int batchSize, Map params) { columnTypes[i] = ColumnType.parseType(requiredFields[i], requiredTypes[i]); } initTableInfo(columnTypes, requiredFields, batchSize); + this.table = PaimonUtils.deserialize(params.get("serialized_table")); + this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); + if (LOG.isDebugEnabled()) { + LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames); + } this.paimonSplit = PaimonUtils.deserialize(params.get("serialized_split")); - this.ctlId = Long.parseLong(params.get("ctl_id")); - this.dbId = Long.parseLong(params.get("db_id")); - this.tblId = Long.parseLong(params.get("tbl_id")); - this.dbName = params.get("db_name"); - this.tblName = params.get("tbl_name"); - this.queryType = params.get("query_type"); this.hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors @@ -115,7 +106,6 @@ public void open() { // so we need to provide a classloader, otherwise it will cause NPE. Thread.currentThread().setContextClassLoader(classLoader); preExecutionAuthenticator.execute(() -> { - initTable(); initReader(); return null; }); @@ -143,24 +133,6 @@ protected int getNext() throws IOException { } } - private void initTable() { - PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, - paimonOptionParams, hadoopOptionParams, dbName, tblName, queryType); - TableExt tableExt = PaimonTableCache.getTable(key); - Table paimonTable = tableExt.getTable(); - if (paimonTable == null) { - throw new RuntimeException( - String.format( - "Failed to get Paimon system table {%s}.{%s}${%s}. ", - dbName, tblName, queryType)); - } - this.table = paimonTable; - this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); - if (LOG.isDebugEnabled()) { - LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames); - } - } - private void initReader() throws IOException { ReadBuilder readBuilder = table.newReadBuilder(); if (this.fields.length > this.paimonAllFieldNames.size()) { diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java deleted file mode 100644 index e5f067af96b728..00000000000000 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java +++ /dev/null @@ -1,221 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.paimon; - -import com.google.common.base.Objects; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import org.apache.hadoop.conf.Configuration; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -public class PaimonTableCache { - // Max cache num of paimon table - public static final long max_external_schema_cache_num = 50; - // The expiration time of a cache object after last access of it. - public static final long external_cache_expire_time_minutes_after_access = 100; - - private static final Logger LOG = LoggerFactory.getLogger(PaimonTableCache.class); - - private static LoadingCache tableCache = CacheBuilder.newBuilder() - .maximumSize(max_external_schema_cache_num) - .expireAfterAccess(external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheLoader() { - @Override - public TableExt load(PaimonTableCacheKey key) { - return loadTable(key); - } - }); - - private static TableExt loadTable(PaimonTableCacheKey key) { - try { - LOG.warn("load table:{}", key); - Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams()); - Table table; - if (key.getQueryType() != null) { - table = catalog.getTable(new Identifier(key.getDbName(), key.getTblName(), - null, key.getQueryType())); - } else { - table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); - } - return new TableExt(table, System.currentTimeMillis()); - } catch (Catalog.TableNotExistException e) { - LOG.warn("failed to create paimon table ", e); - throw new RuntimeException(e); - } - } - - private static Catalog createCatalog( - Map paimonOptionParams, - Map hadoopOptionParams) { - Options options = new Options(); - paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue())); - Configuration hadoopConf = new Configuration(); - hadoopOptionParams.entrySet().stream().forEach(kv -> hadoopConf.set(kv.getKey(), kv.getValue())); - CatalogContext context = CatalogContext.create(options, hadoopConf); - return CatalogFactory.createCatalog(context); - } - - public static void invalidateTableCache(PaimonTableCacheKey key) { - tableCache.invalidate(key); - } - - public static TableExt getTable(PaimonTableCacheKey key) { - try { - return tableCache.get(key); - } catch (ExecutionException e) { - throw new RuntimeException("failed to get table for:" + key); - } - } - - public static class TableExt { - private Table table; - private long createTime; - - public TableExt(Table table, long createTime) { - this.table = table; - this.createTime = createTime; - } - - public Table getTable() { - return table; - } - - public long getCreateTime() { - return createTime; - } - } - - public static class PaimonTableCacheKey { - // in key - private final long ctlId; - private final long dbId; - private final long tblId; - - // not in key - private Map paimonOptionParams; - private Map hadoopOptionParams; - private String dbName; - private String tblName; - private String queryType; - - public PaimonTableCacheKey(long ctlId, long dbId, long tblId, - Map paimonOptionParams, - Map hadoopOptionParams, - String dbName, String tblName) { - this.ctlId = ctlId; - this.dbId = dbId; - this.tblId = tblId; - this.paimonOptionParams = paimonOptionParams; - this.hadoopOptionParams = hadoopOptionParams; - this.dbName = dbName; - this.tblName = tblName; - } - - public PaimonTableCacheKey(long ctlId, long dbId, long tblId, - Map paimonOptionParams, - Map hadoopOptionParams, - String dbName, String tblName, String queryType) { - this.ctlId = ctlId; - this.dbId = dbId; - this.tblId = tblId; - this.paimonOptionParams = paimonOptionParams; - this.hadoopOptionParams = hadoopOptionParams; - this.dbName = dbName; - this.tblName = tblName; - this.queryType = queryType; - } - - public long getCtlId() { - return ctlId; - } - - public long getDbId() { - return dbId; - } - - public long getTblId() { - return tblId; - } - - public Map getPaimonOptionParams() { - return paimonOptionParams; - } - - public Map getHadoopOptionParams() { - return hadoopOptionParams; - } - - public String getDbName() { - return dbName; - } - - public String getTblName() { - return tblName; - } - - public String getQueryType() { - return queryType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PaimonTableCacheKey that = (PaimonTableCacheKey) o; - return ctlId == that.ctlId && dbId == that.dbId && tblId == that.tblId && Objects.equal( - queryType, - that.queryType); - } - - @Override - public int hashCode() { - return Objects.hashCode(ctlId, dbId, tblId); - } - - @Override - public String toString() { - return "PaimonTableCacheKey{" - + "ctlId=" + ctlId - + ", dbId=" + dbId - + ", tblId=" + tblId - + ", paimonOptionParams=" + paimonOptionParams - + ", hadoopOptionParams=" + hadoopOptionParams - + ", dbName='" + dbName + '\'' - + ", tblName='" + tblName + '\'' - + ", queryType='" + queryType + '\'' - + '}'; - } - } - -} From 0a1b1c06162956f7f4e05cbe63da3d162fec4c57 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 16:22:35 +0800 Subject: [PATCH 04/13] change api getMetaScanRange --- .../tvf/source/MetadataScanNode.java | 23 +++++++++---------- .../BackendsTableValuedFunction.java | 5 ++-- .../CatalogsTableValuedFunction.java | 5 ++-- .../FrontendsDisksTableValuedFunction.java | 5 ++-- .../FrontendsTableValuedFunction.java | 5 ++-- .../HudiTableValuedFunction.java | 6 ++--- .../JobsTableValuedFunction.java | 4 ++-- .../MetadataTableValuedFunction.java | 2 +- .../MvInfosTableValuedFunction.java | 5 ++-- .../PartitionValuesTableValuedFunction.java | 4 ++-- .../PartitionsTableValuedFunction.java | 5 ++-- .../TasksTableValuedFunction.java | 5 ++-- 12 files changed, 33 insertions(+), 41 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java index aff5dac01e35ca..7a91ad8c2b7c04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java @@ -73,21 +73,20 @@ protected void createScanRangeLocations() { .filter(slot -> slot.isMaterialized()) .map(slot -> slot.getColumn().getName()) .collect(java.util.stream.Collectors.toList()); - for (TMetaScanRange metaScanRange : tvf.getMetaScanRanges(requiredFileds)) { - TScanRange scanRange = new TScanRange(); - scanRange.setMetaScanRange(metaScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setMetaScanRange(tvf.getMetaScanRange(requiredFileds)); - TScanRangeLocation location = new TScanRangeLocation(); - Backend backend = backendPolicy.getNextBe(); - location.setBackendId(backend.getId()); - location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + // TODO: split ranges to be + TScanRangeLocation location = new TScanRangeLocation(); + Backend backend = backendPolicy.getNextBe(); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); - TScanRangeLocations locations = new TScanRangeLocations(); - locations.addToLocations(location); - locations.setScanRange(scanRange); + TScanRangeLocations locations = new TScanRangeLocations(); + locations.addToLocations(location); + locations.setScanRange(scanRange); - scanRangeLocations.add(locations); - } + scanRangeLocations.add(locations); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java index 826e1e7c59ded2..aca6779fc64f23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java @@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -104,13 +103,13 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.BACKENDS); TBackendsMetadataParams backendsMetadataParams = new TBackendsMetadataParams(); backendsMetadataParams.setClusterName(""); metaScanRange.setBackendsParams(backendsMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java index 75df6bb62210bc..35719f3fa52ebe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -79,9 +78,9 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.CATALOGS); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java index d1b176f35a1fd5..31ddf55313a809 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java @@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -88,13 +87,13 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.FRONTENDS_DISKS); TFrontendsMetadataParams frontendsMetadataParams = new TFrontendsMetadataParams(); frontendsMetadataParams.setClusterName(""); metaScanRange.setFrontendsParams(frontendsMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java index 5acf44f4d3bd10..efb90070e65fe3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java @@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -97,13 +96,13 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.FRONTENDS); TFrontendsMetadataParams frontendsMetadataParams = new TFrontendsMetadataParams(); frontendsMetadataParams.setClusterName(""); metaScanRange.setFrontendsParams(frontendsMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java index 70e1ec84928905..c203b07de91946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java @@ -120,7 +120,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.HUDI); // set hudi metadata params @@ -130,9 +130,9 @@ public List getMetaScanRanges(List requiredFileds) { hudiMetadataParams.setDatabase(hudiTableName.getDb()); hudiMetadataParams.setTable(hudiTableName.getTbl()); metaScanRange.setHudiParams(hudiMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } - + @Override public String getTableName() { return "HudiMetadataTableValuedFunction"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java index b5d0489d30c0b4..40420b979a7845 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java @@ -99,14 +99,14 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.JOBS); TJobsMetadataParams jobParam = new TJobsMetadataParams(); jobParam.setType(jobType.name()); jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift()); metaScanRange.setJobsParams(jobParam); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index 92b30b0347ba47..32be139657c3e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -60,7 +60,7 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co public abstract TMetadataType getMetadataType(); - public abstract List getMetaScanRanges(List requiredFileds); + public abstract TMetaScanRange getMetaScanRange(List requiredFileds); @Override public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java index c40a8d4716d1ed..e0c42165530a2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java @@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -106,7 +105,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() start"); } @@ -119,7 +118,7 @@ public List getMetaScanRanges(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() end"); } - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java index e0a0d4dc649bc9..494a68edf3af9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java @@ -145,7 +145,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() start"); } @@ -156,7 +156,7 @@ public List getMetaScanRanges(List requiredFileds) { partitionParam.setDatabase(databaseName); partitionParam.setTable(tableName); metaScanRange.setPartitionValuesParams(partitionParam); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java index 3ffb77cdbc6762..53f99c549a3636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java @@ -43,7 +43,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -211,7 +210,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() start"); } @@ -225,7 +224,7 @@ public List getMetaScanRanges(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() end"); } - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java index 5d60adbeacfd45..b343a7dbeed547 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java @@ -31,7 +31,6 @@ import org.apache.doris.thrift.TTasksMetadataParams; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; @@ -99,14 +98,14 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.TASKS); TTasksMetadataParams taskParam = new TTasksMetadataParams(); taskParam.setType(jobType.name()); taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift()); metaScanRange.setTasksParams(taskParam); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override From e8cffd2c52a5866bf65dde38cf725c23906628c8 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 17:30:31 +0800 Subject: [PATCH 05/13] fix IcebergTableValuedFunction and PaimonTableValuedFunction --- .../IcebergTableValuedFunction.java | 27 ++++++------ .../PaimonTableValuedFunction.java | 41 ++++--------------- gensrc/thrift/PlanNodes.thrift | 33 +++++++-------- 3 files changed, 37 insertions(+), 64 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java index d9ccc392084dbf..359ad2ce57d35e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java @@ -31,12 +31,10 @@ import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.thrift.TIcebergMetadataParams; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataType; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.iceberg.FileScanTask; @@ -48,6 +46,8 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * The class of table valued function for iceberg metadata. @@ -135,8 +135,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { - List scanRanges = Lists.newArrayList(); + public TMetaScanRange getMetaScanRange(List requiredFileds) { CloseableIterable tasks; try { tasks = preExecutionAuthenticator.execute(() -> { @@ -145,17 +144,15 @@ public List getMetaScanRanges(List requiredFileds) { } catch (Exception e) { throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e)); } - for (FileScanTask task : tasks) { - TMetaScanRange metaScanRange = new TMetaScanRange(); - metaScanRange.setMetadataType(TMetadataType.ICEBERG); - // set iceberg metadata params - TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams(); - icebergMetadataParams.setHadoopProps(hadoopProps); - icebergMetadataParams.setSerializedTask(SerializationUtil.serializeToBase64(task)); - metaScanRange.setIcebergParams(icebergMetadataParams); - scanRanges.add(metaScanRange); - } - return scanRanges; + + TMetaScanRange tMetaScanRange = new TMetaScanRange(); + tMetaScanRange.setMetadataType(TMetadataType.ICEBERG); + tMetaScanRange.setHadoopProps(hadoopProps); + tMetaScanRange.setSerializedTable(SerializationUtil.serializeToBase64(sysTable)); + tMetaScanRange.setSerializedSplits(StreamSupport.stream(tasks.spliterator(), false) + .map(SerializationUtil::serializeToBase64) + .collect(Collectors.toList())); + return tMetaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java index 91c131603b66de..b9a271bb13ecce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java @@ -34,7 +34,6 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataType; -import org.apache.doris.thrift.TPaimonMetadataParams; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -60,12 +59,7 @@ public class PaimonTableValuedFunction extends MetadataTableValuedFunction { private final Table paimonSysTable; private final List schema; private final Map hadoopProps; - private final Map paimonProps; private final ExecutionAuthenticator hadoopAuthenticator; - private final TableName paimonTableName; - private final long ctlId; - private final long dbId; - private final long tblId; /** * Creates a new Paimon table-valued function instance. @@ -84,18 +78,14 @@ public PaimonTableValuedFunction(TableName paimonTableName, String queryType) th throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an paimon catalog"); } - this.paimonTableName = paimonTableName; PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog) dorisCatalog; this.hadoopProps = paimonExternalCatalog.getCatalogProperty().getHadoopProperties(); - this.paimonProps = paimonExternalCatalog.getPaimonOptionsMap(); this.hadoopAuthenticator = paimonExternalCatalog.getExecutionAuthenticator(); - this.ctlId = paimonExternalCatalog.getId(); ExternalDatabase database = paimonExternalCatalog.getDb(paimonTableName.getDb()) .orElseThrow(() -> new AnalysisException( String.format("Paimon catalog database '%s' does not exist", paimonTableName.getDb()) )); - this.dbId = database.getId(); ExternalTable externalTable = database.getTable(paimonTableName.getTbl()) .orElseThrow(() -> new AnalysisException( @@ -103,7 +93,6 @@ public PaimonTableValuedFunction(TableName paimonTableName, String queryType) th paimonTableName.getDb(), paimonTableName.getTbl()) )); NameMapping buildNameMapping = externalTable.getOrBuildNameMapping(); - this.tblId = externalTable.getId(); this.paimonSysTable = paimonExternalCatalog.getPaimonSystemTable(buildNameMapping, queryType); @@ -148,7 +137,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { int[] projections = requiredFileds.stream().mapToInt( field -> paimonSysTable.rowType().getFieldNames() .stream() @@ -157,7 +146,6 @@ public List getMetaScanRanges(List requiredFileds) { .indexOf(field)) .toArray(); List splits; - try { splits = hadoopAuthenticator.execute( () -> paimonSysTable.newReadBuilder().withProjection(projections).newScan().plan().splits()); @@ -165,7 +153,13 @@ public List getMetaScanRanges(List requiredFileds) { throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e)); } - return splits.stream().map(this::createMetaScanRange).collect(Collectors.toList()); + TMetaScanRange tMetaScanRange = new TMetaScanRange(); + tMetaScanRange.setMetadataType(TMetadataType.PAIMON); + tMetaScanRange.setHadoopProps(hadoopProps); + tMetaScanRange.setSerializedTable(PaimonUtil.encodeObjectToString(paimonSysTable)); + tMetaScanRange.setSerializedSplits( + splits.stream().map(PaimonUtil::encodeObjectToString).collect(Collectors.toList())); + return tMetaScanRange; } @Override @@ -177,23 +171,4 @@ public String getTableName() { public List getTableColumns() throws AnalysisException { return schema; } - - private TMetaScanRange createMetaScanRange(Split split) { - TMetaScanRange tMetaScanRange = new TMetaScanRange(); - tMetaScanRange.setMetadataType(TMetadataType.PAIMON); - - TPaimonMetadataParams tPaimonMetadataParams = new TPaimonMetadataParams(); - tPaimonMetadataParams.setCtlId(ctlId); - tPaimonMetadataParams.setDbId(dbId); - tPaimonMetadataParams.setTblId(tblId); - tPaimonMetadataParams.setQueryType(queryType); - tPaimonMetadataParams.setDbName(paimonTableName.getDb()); - tPaimonMetadataParams.setTblName(paimonTableName.getTbl()); - tPaimonMetadataParams.setHadoopProps(hadoopProps); - tPaimonMetadataParams.setPaimonProps(paimonProps); - tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split)); - - tMetaScanRange.setPaimonParams(tPaimonMetadataParams); - return tMetaScanRange; - } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 7faf8683b66262..cf4e88a0906a37 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -537,24 +537,23 @@ struct TDataGenScanRange { } +// deprecated struct TIcebergMetadataParams { - 1: optional string serialized_task // deprecated + 1: optional string serialized_task 2: optional map hadoop_props - 3: optional string serialized_table } - +// deprecated struct TPaimonMetadataParams { - 1: optional string db_name // deprecated - 2: optional string tbl_name // deprecated - 3: optional string query_type // deprecated - 4: optional i64 ctl_id // deprecated - 5: optional i64 db_id // deprecated - 6: optional i64 tbl_id // deprecated - 7: optional string serialized_split // deprecated + 1: optional string db_name + 2: optional string tbl_name + 3: optional string query_type + 4: optional i64 ctl_id + 5: optional i64 db_id + 6: optional i64 tbl_id + 7: optional string serialized_split 8: optional map hadoop_props - 9: optional map paimon_props // deprecated - 10: optional string serialized_table + 9: optional map paimon_props } struct THudiMetadataParams { @@ -614,7 +613,7 @@ struct TMetaCacheStatsParams { struct TMetaScanRange { 1: optional Types.TMetadataType metadata_type - 2: optional TIcebergMetadataParams iceberg_params + 2: optional TIcebergMetadataParams iceberg_params // deprecated 3: optional TBackendsMetadataParams backends_params 4: optional TFrontendsMetadataParams frontends_params 5: optional TQueriesMetadataParams queries_params @@ -625,10 +624,12 @@ struct TMetaScanRange { 10: optional TMetaCacheStatsParams meta_cache_stats_params 11: optional TPartitionValuesMetadataParams partition_values_params 12: optional THudiMetadataParams hudi_params - 13: optional TPaimonMetadataParams paimon_params + 13: optional TPaimonMetadataParams paimon_params // deprecated - // ... more params - 999: optional list splits + // for quering sys tables for Paimon/Iceberg + 14: optional map hadoop_props + 15: optional string serialized_table; + 16: optional list serialized_splits; } // Specification of an individual data range which is held in its entirety From 3b98e12d25983dc7ce65342e4c0482e430bd7e24 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 19:59:41 +0800 Subject: [PATCH 06/13] split the tasks to be --- .../tvf/source/MetadataScanNode.java | 57 +++++++++++++++---- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java index 7a91ad8c2b7c04..790617ace18b09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java @@ -73,20 +73,55 @@ protected void createScanRangeLocations() { .filter(slot -> slot.isMaterialized()) .map(slot -> slot.getColumn().getName()) .collect(java.util.stream.Collectors.toList()); - TScanRange scanRange = new TScanRange(); - scanRange.setMetaScanRange(tvf.getMetaScanRange(requiredFileds)); + TMetaScanRange metaScanRange = tvf.getMetaScanRange(requiredFileds); - // TODO: split ranges to be - TScanRangeLocation location = new TScanRangeLocation(); - Backend backend = backendPolicy.getNextBe(); - location.setBackendId(backend.getId()); - location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + if (!metaScanRange.isSetSerializedSplits()) { + // no need to split ranges to send to backends + TScanRangeLocation location = new TScanRangeLocation(); + Backend backend = backendPolicy.getNextBe(); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); - TScanRangeLocations locations = new TScanRangeLocations(); - locations.addToLocations(location); - locations.setScanRange(scanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setMetaScanRange(metaScanRange); - scanRangeLocations.add(locations); + TScanRangeLocations locations = new TScanRangeLocations(); + locations.addToLocations(location); + locations.setScanRange(scanRange); + + scanRangeLocations.add(locations); + } else { + // need to split ranges to send to backends + List backends = Lists.newArrayList(backendPolicy.getBackends()); + List splits = metaScanRange.getSerializedSplits(); + int numSplitsPerBE = Math.max(1, splits.size() / backends.size()); + + for (int i = 0; i < backends.size(); i++) { + int from = i * numSplitsPerBE; + if (from >= splits.size()) { + continue; // no splits for this backend + } + int to = Math.min((i + 1) * numSplitsPerBE, splits.size()); + + // set splited task to TMetaScanRange + TMetaScanRange subRange = metaScanRange.deepCopy(); + subRange.setSerializedSplits(splits.subList(from, to)); + + TScanRangeLocation location = new TScanRangeLocation(); + Backend backend = backends.get(i); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + + TScanRange scanRange = new TScanRange(); + scanRange.setMetaScanRange(subRange); + + TScanRangeLocations locations = new TScanRangeLocations(); + locations.addToLocations(location); + locations.setScanRange(scanRange); + + scanRangeLocations.add(locations); + } + } } @Override From ab3001bbbf3ee7f6374a90ab153a3f055ed84405 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 21:40:16 +0800 Subject: [PATCH 07/13] clean meta_scanner and fix jni --- .../table/iceberg_sys_table_jni_reader.cpp | 11 +++++---- .../table/iceberg_sys_table_jni_reader.h | 4 ++-- .../table/paimon_sys_table_jni_reader.cpp | 19 ++++++--------- .../table/paimon_sys_table_jni_reader.h | 5 ++-- be/src/vec/exec/scan/meta_scanner.cpp | 23 ------------------- be/src/vec/exec/scan/meta_scanner.h | 2 -- 6 files changed, 17 insertions(+), 47 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp index 55ee05066755c2..a833bc52332179 100644 --- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp @@ -27,8 +27,8 @@ static const std::string HADOOP_OPTION_PREFIX = "hadoop."; IcebergSysTableJniReader::IcebergSysTableJniReader( const std::vector& file_slot_descs, RuntimeState* state, - RuntimeProfile* profile, const TIcebergMetadataParams& range_params) - : JniReader(file_slot_descs, state, profile), _range_params(range_params) {} + RuntimeProfile* profile, const TMetaScanRange& meta_scan_range) + : JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) {} Status IcebergSysTableJniReader::init_reader( const std::unordered_map* colname_to_value_range) { @@ -39,11 +39,12 @@ Status IcebergSysTableJniReader::init_reader( required_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type())); } std::map params; - params["serialized_task"] = _range_params.serialized_task; + // "," is not in base64 + params["serialized_tasks"] = join(_meta_scan_range.serialized_splits, ","); params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); - params["time_zone"] = _state->timezone_obj().name(); - for (const auto& kv : _range_params.hadoop_props) { + params["time_zone"] = _state->timezone(); + for (const auto& kv : _meta_scan_range.hadoop_props) { params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; } _jni_connector = diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h index ed867e46abe1a0..982f4357343f58 100644 --- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h +++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h @@ -47,7 +47,7 @@ class IcebergSysTableJniReader : public JniReader { public: IcebergSysTableJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, - const TIcebergMetadataParams& range_params); + const TMetaScanRange& meta_scan_range); ~IcebergSysTableJniReader() override = default; @@ -55,7 +55,7 @@ class IcebergSysTableJniReader : public JniReader { const std::unordered_map* colname_to_value_range); private: - const TIcebergMetadataParams& _range_params; + const TMetaScanRange& _meta_scan_range; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp index 147ebfba179e26..6e9c7f50c7e1c3 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp @@ -27,8 +27,8 @@ const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop."; PaimonSysTableJniReader::PaimonSysTableJniReader( const std::vector& file_slot_descs, RuntimeState* state, - RuntimeProfile* profile, const TPaimonMetadataParams& range_params) - : JniReader(file_slot_descs, state, profile), _range_params(range_params) { + RuntimeProfile* profile, const TMetaScanRange& meta_scan_range) + : JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) { std::vector required_fields; std::vector required_types; for (const auto& desc : _file_slot_descs) { @@ -37,18 +37,13 @@ PaimonSysTableJniReader::PaimonSysTableJniReader( } std::map params; - params["db_name"] = _range_params.db_name; - params["tbl_name"] = _range_params.tbl_name; - params["query_type"] = _range_params.query_type; - params["serialized_split"] = _range_params.serialized_split; + params["serialized_table"] = _meta_scan_range.serialized_table; + // "," is not in base64 + params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ","); params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); - - for (const auto& kv : _range_params.paimon_props) { - params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; - } - - for (const auto& kv : _range_params.hadoop_props) { + params["time_zone"] = _state->timezone(); + for (const auto& kv : _meta_scan_range.hadoop_props) { params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; } diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h index f7b3108f5afd6a..a6f43899e2db96 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h @@ -46,10 +46,9 @@ class PaimonSysTableJniReader : public JniReader { public: static const std::string HADOOP_OPTION_PREFIX; - static const std::string PAIMON_OPTION_PREFIX; PaimonSysTableJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, - const TPaimonMetadataParams& range_params); + const TMetaScanRange& meta_scan_range); ~PaimonSysTableJniReader() override = default; @@ -58,7 +57,7 @@ class PaimonSysTableJniReader : public JniReader { private: const std::unordered_map* _colname_to_value_range; - const TPaimonMetadataParams& _range_params; + const TMetaScanRange& _meta_scan_range; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/scan/meta_scanner.cpp b/be/src/vec/exec/scan/meta_scanner.cpp index d64806b5e0879b..58757bd186537a 100644 --- a/be/src/vec/exec/scan/meta_scanner.cpp +++ b/be/src/vec/exec/scan/meta_scanner.cpp @@ -242,9 +242,6 @@ Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { VLOG_CRITICAL << "MetaScanner::_fetch_metadata"; TFetchSchemaTableDataRequest request; switch (meta_scan_range.metadata_type) { - case TMetadataType::ICEBERG: - RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, &request)); - break; case TMetadataType::HUDI: RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, &request)); break; @@ -310,26 +307,6 @@ Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { return Status::OK(); } -Status MetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, - TFetchSchemaTableDataRequest* request) { - VLOG_CRITICAL << "MetaScanner::_build_iceberg_metadata_request"; - if (!meta_scan_range.__isset.iceberg_params) { - return Status::InternalError("Can not find TIcebergMetadataParams from meta_scan_range."); - } - - // create request - request->__set_cluster_name(""); - request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); - - // create TMetadataTableRequestParams - TMetadataTableRequestParams metadata_table_params; - metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG); - metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params); - - request->__set_metada_table_params(metadata_table_params); - return Status::OK(); -} - Status MetaScanner::_build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "MetaScanner::_build_hudi_metadata_request"; diff --git a/be/src/vec/exec/scan/meta_scanner.h b/be/src/vec/exec/scan/meta_scanner.h index f9672ef7567e1d..dd8320b4dfd691 100644 --- a/be/src/vec/exec/scan/meta_scanner.h +++ b/be/src/vec/exec/scan/meta_scanner.h @@ -64,8 +64,6 @@ class MetaScanner : public Scanner { private: Status _fill_block_with_remote_data(const std::vector& columns); Status _fetch_metadata(const TMetaScanRange& meta_scan_range); - Status _build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, - TFetchSchemaTableDataRequest* request); Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); Status _build_backends_metadata_request(const TMetaScanRange& meta_scan_range, From 956664cde83ba8088e9251332bd1513a287ad0f1 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 21:40:23 +0800 Subject: [PATCH 08/13] fix --- .../org/apache/doris/tablefunction/HudiTableValuedFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java index c203b07de91946..2125d420fa5840 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java @@ -132,7 +132,7 @@ public TMetaScanRange getMetaScanRange(List requiredFileds) { metaScanRange.setHudiParams(hudiMetadataParams); return metaScanRange; } - + @Override public String getTableName() { return "HudiMetadataTableValuedFunction"; From 015b72afa90359e620127d1287da4c0e68ee6547 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 14 Aug 2025 22:46:32 +0800 Subject: [PATCH 09/13] refactor IcebergSysTableJniScanner --- .../iceberg/IcebergSysTableJniScanner.java | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java index 350d0d872c3941..da64afee91bb6f 100644 --- a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java +++ b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java @@ -23,6 +23,7 @@ import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; +import com.google.common.base.Preconditions; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.StructLike; import org.apache.iceberg.io.CloseableIterator; @@ -34,6 +35,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -47,16 +50,21 @@ public class IcebergSysTableJniScanner extends JniScanner { private static final String HADOOP_OPTION_PREFIX = "hadoop."; private final ClassLoader classLoader; private final PreExecutionAuthenticator preExecutionAuthenticator; - private final FileScanTask scanTask; + private final Iterator scanTasks; private final List fields; private final String timezone; private CloseableIterator reader; public IcebergSysTableJniScanner(int batchSize, Map params) { this.classLoader = this.getClass().getClassLoader(); - this.scanTask = SerializationUtil.deserializeFromBase64(params.get("serialized_task")); + List scanTasks = Arrays.stream(params.get("serialized_splits").split(",")) + .map(SerializationUtil::deserializeFromBase64) + .map(obj -> (FileScanTask) obj) + .collect(Collectors.toList()); + Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not be empty"); + this.scanTasks = scanTasks.iterator(); String[] requiredFields = params.get("required_fields").split(","); - this.fields = selectSchema(scanTask.schema().asStruct(), requiredFields); + this.fields = selectSchema(scanTasks.get(0).schema().asStruct(), requiredFields); this.timezone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); Map hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) @@ -69,8 +77,14 @@ public IcebergSysTableJniScanner(int batchSize, Map params) { @Override public void open() throws IOException { + Thread.currentThread().setContextClassLoader(classLoader); + nextScanTask(); + } + + private void nextScanTask() throws IOException { + Preconditions.checkArgument(scanTasks.hasNext()); + FileScanTask scanTask = scanTasks.next(); try { - Thread.currentThread().setContextClassLoader(classLoader); preExecutionAuthenticator.execute(() -> { // execute FileScanTask to get rows reader = scanTask.asDataTask().rows().iterator(); @@ -78,7 +92,7 @@ public void open() throws IOException { }); } catch (Exception e) { this.close(); - String msg = String.format("Failed to open IcebergMetadataJniScanner"); + String msg = String.format("Failed to open next scan task: %s", scanTask); LOG.error(msg, e); throw new IOException(msg, e); } @@ -86,11 +100,14 @@ public void open() throws IOException { @Override protected int getNext() throws IOException { - if (reader == null) { + if (reader == null || scanTasks == null) { return 0; } int rows = 0; - while (reader.hasNext() && rows < getBatchSize()) { + while ((reader.hasNext() || scanTasks.hasNext()) && rows < getBatchSize()) { + if (!reader.hasNext()) { + nextScanTask(); + } StructLike row = reader.next(); for (int i = 0; i < fields.size(); i++) { NestedField field = fields.get(i); From 080db78d350e5f52025e689fac16fbad3fed8af3 Mon Sep 17 00:00:00 2001 From: Socrates Date: Fri, 15 Aug 2025 15:46:32 +0800 Subject: [PATCH 10/13] refactor PaimonSysTableJniScanner --- .../iceberg/IcebergSysTableJniScanner.java | 3 - .../apache/doris/paimon/PaimonJniScanner.java | 3 +- .../paimon/PaimonSysTableJniScanner.java | 146 ++++++++---------- 3 files changed, 65 insertions(+), 87 deletions(-) diff --git a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java index da64afee91bb6f..9603ed35504a6c 100644 --- a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java +++ b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java @@ -100,9 +100,6 @@ private void nextScanTask() throws IOException { @Override protected int getNext() throws IOException { - if (reader == null || scanTasks == null) { - return 0; - } int rows = 0; while ((reader.hasNext() || scanTasks.hasNext()) && rows < getBatchSize()) { if (!reader.hasNext()) { diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 002ceac3537495..4dc1fe90a337ae 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -55,7 +55,6 @@ public class PaimonJniScanner extends JniScanner { private final PaimonColumnValue columnValue = new PaimonColumnValue(); private List paimonAllFieldNames; private List paimonDataTypeList; - private final String timeZone; private RecordReader.RecordIterator recordIterator = null; private final ClassLoader classLoader; private PreExecutionAuthenticator preExecutionAuthenticator; @@ -74,7 +73,7 @@ public PaimonJniScanner(int batchSize, Map params) { } paimonSplit = params.get("paimon_split"); paimonPredicate = params.get("paimon_predicate"); - this.timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); + String timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); columnValue.setTimeZone(timeZone); initTableInfo(columnTypes, requiredFields, batchSize); hadoopOptionParams = params.entrySet().stream() diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java index 3a708690b4c19e..79289bb8256cef 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java @@ -22,6 +22,7 @@ import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; +import com.google.common.base.Preconditions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.Table; @@ -34,33 +35,32 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TimeZone; import java.util.stream.Collectors; /** * JNI-based scanner for reading Apache Paimon system tables + * TODO: unify this with PaimonJniScanner in future */ public class PaimonSysTableJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonSysTableJniScanner.class); private static final String HADOOP_OPTION_PREFIX = "hadoop."; - private static final String PAIMON_OPTION_PREFIX = "paimon."; - - private final Map params; - private final Map hadoopOptionParams; - private final Map paimonOptionParams; private final ClassLoader classLoader; - private final Split paimonSplit; - private Table table; - private RecordReader reader; + private final Table table; + private final Iterator paimonSplits; private final PaimonColumnValue columnValue = new PaimonColumnValue(); - private List paimonDataTypeList; - private List paimonAllFieldNames; + private final List paimonAllFieldNames; + private final int[] projected; + private final List paimonDataTypeList; private final PreExecutionAuthenticator preExecutionAuthenticator; - private RecordReader.RecordIterator recordIterator = null; + private RecordReader reader; + private RecordReader.RecordIterator recordIterator; /** * Constructs a new PaimonSysTableJniScanner for reading Paimon system tables. @@ -70,50 +70,60 @@ public PaimonSysTableJniScanner(int batchSize, Map params) { if (LOG.isDebugEnabled()) { LOG.debug("params:{}", params); } - this.params = params; String[] requiredFields = params.get("required_fields").split(","); String[] requiredTypes = params.get("required_types").split("#"); ColumnType[] columnTypes = new ColumnType[requiredTypes.length]; for (int i = 0; i < requiredTypes.length; i++) { columnTypes[i] = ColumnType.parseType(requiredFields[i], requiredTypes[i]); } + String timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); + columnValue.setTimeZone(timeZone); initTableInfo(columnTypes, requiredFields, batchSize); this.table = PaimonUtils.deserialize(params.get("serialized_table")); this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); if (LOG.isDebugEnabled()) { LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames); } - this.paimonSplit = PaimonUtils.deserialize(params.get("serialized_split")); - this.hadoopOptionParams = params.entrySet().stream() + resetDatetimeV2Precision(); + this.projected = Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray(); + this.paimonDataTypeList = Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)) + .collect(Collectors.toList()); + this.paimonSplits = Arrays.stream(params.get("serialized_splits").split(",")) + .map(PaimonUtils::deserialize).map(obj -> (Split) obj) + .collect(Collectors.toList()).iterator(); + Map hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) - .collect(Collectors - .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), - Entry::getValue)); - this.paimonOptionParams = params.entrySet().stream() - .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) - .collect(Collectors - .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), - Entry::getValue)); + .collect(Collectors.toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), + Entry::getValue)); this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); } @Override - public void open() { + public void open() throws IOException { + Thread.currentThread().setContextClassLoader(classLoader); + if (!paimonSplits.hasNext()) { + throw new IOException("Failed to open PaimonSysTableJniScanner: No valid splits found"); + } + nextReader(); + } + + private void nextReader() throws IOException { + Preconditions.checkArgument(paimonSplits.hasNext(), "No more splits available"); + Split paimonSplit = paimonSplits.next(); + ReadBuilder readBuilder = table.newReadBuilder(); + readBuilder.withProjection(projected); try { - // When the user does not specify hive-site.xml, Paimon will look for the file from the classpath: - // org.apache.paimon.hive.HiveCatalog.createHiveConf: - // `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)` - // so we need to provide a classloader, otherwise it will cause NPE. - Thread.currentThread().setContextClassLoader(classLoader); preExecutionAuthenticator.execute(() -> { - initReader(); + reader = readBuilder.newRead().executeFilter().createReader(paimonSplit); + Preconditions.checkArgument(recordIterator == null); + recordIterator = reader.readBatch(); return null; }); - resetDatetimeV2Precision(); - - } catch (Throwable e) { - LOG.warn("Failed to open paimon_scanner: {}", e.getMessage(), e); - throw new RuntimeException(e); + } catch (Exception e) { + this.close(); + String msg = String.format("Failed to open next paimonSplit: %s", paimonSplits); + LOG.error(msg, e); + throw new IOException(msg, e); } } @@ -129,28 +139,8 @@ protected int getNext() throws IOException { try { return preExecutionAuthenticator.execute(this::readAndProcessNextBatch); } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void initReader() throws IOException { - ReadBuilder readBuilder = table.newReadBuilder(); - if (this.fields.length > this.paimonAllFieldNames.size()) { - throw new IOException( - String.format( - "The jni reader fields' size {%s} is not matched with paimon fields' size {%s}." - + " Please refresh table and try again", - fields.length, paimonAllFieldNames.size())); + throw new IOException("Failed to getNext in PaimonSysTableJniScanner", e); } - int[] projected = getProjected(); - readBuilder.withProjection(projected); - reader = readBuilder.newRead().executeFilter().createReader(paimonSplit); - paimonDataTypeList = - Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); - } - - private int[] getProjected() { - return Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray(); } private void resetDatetimeV2Precision() { @@ -171,35 +161,27 @@ private void resetDatetimeV2Precision() { private int readAndProcessNextBatch() throws IOException { int rows = 0; - try { - if (recordIterator == null) { - recordIterator = reader.readBatch(); - } - - while (recordIterator != null) { - InternalRow record; - while ((record = recordIterator.next()) != null) { - columnValue.setOffsetRow(record); - for (int i = 0; i < fields.length; i++) { - columnValue.setIdx(i, types[i], paimonDataTypeList.get(i)); - long l = System.nanoTime(); - appendData(i, columnValue); - appendDataTime += System.nanoTime() - l; - } - rows++; - if (rows >= batchSize) { - return rows; - } + while (recordIterator != null) { + InternalRow record; + while ((record = recordIterator.next()) != null) { + columnValue.setOffsetRow(record); + for (int i = 0; i < fields.length; i++) { + columnValue.setIdx(i, types[i], paimonDataTypeList.get(i)); + long l = System.nanoTime(); + appendData(i, columnValue); + appendDataTime += System.nanoTime() - l; + } + rows++; + if (rows >= batchSize) { + return rows; } - recordIterator.releaseBatch(); - recordIterator = reader.readBatch(); } - } catch (Exception e) { - close(); - LOG.warn("Failed to get the next batch of paimon. " - + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", - paimonSplit, params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e); - throw new IOException(e); + recordIterator.releaseBatch(); + recordIterator = reader.readBatch(); + if (recordIterator == null && paimonSplits.hasNext()) { + // try to get next reader + nextReader(); + } } return rows; } From 02b66c582d55ac6e6c27ce0311387cec555347d6 Mon Sep 17 00:00:00 2001 From: Socrates Date: Fri, 15 Aug 2025 16:04:57 +0800 Subject: [PATCH 11/13] fix build --- be/src/vec/exec/scan/meta_scanner.cpp | 8 ++++---- .../doris/tablefunction/JobsTableValuedFunction.java | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/scan/meta_scanner.cpp b/be/src/vec/exec/scan/meta_scanner.cpp index 58757bd186537a..4e95c13f2f030a 100644 --- a/be/src/vec/exec/scan/meta_scanner.cpp +++ b/be/src/vec/exec/scan/meta_scanner.cpp @@ -67,14 +67,14 @@ Status MetaScanner::open(RuntimeState* state) { RETURN_IF_ERROR(Scanner::open(state)); if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) { // TODO: refactor this code - auto reader = IcebergSysTableJniReader::create_unique( - _tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.iceberg_params); + auto reader = IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, + _scan_range.meta_scan_range); const std::unordered_map colname_to_value_range; RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); _reader = std::move(reader); } else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PAIMON) { - auto reader = PaimonSysTableJniReader::create_unique( - _tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.paimon_params); + auto reader = PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, + _scan_range.meta_scan_range); const std::unordered_map colname_to_value_range; RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); _reader = std::move(reader); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java index 40420b979a7845..d9c5ec5ed2b4ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java @@ -31,7 +31,6 @@ import org.apache.doris.thrift.TMetadataType; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; From fa286d6cecc7ffd4a3679a5584399855335c11e0 Mon Sep 17 00:00:00 2001 From: Socrates Date: Fri, 15 Aug 2025 16:20:09 +0800 Subject: [PATCH 12/13] fix --- be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp index a833bc52332179..ffcae20df9dce2 100644 --- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp @@ -40,7 +40,7 @@ Status IcebergSysTableJniReader::init_reader( } std::map params; // "," is not in base64 - params["serialized_tasks"] = join(_meta_scan_range.serialized_splits, ","); + params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ","); params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); params["time_zone"] = _state->timezone(); From df280e5db12053c167006f96b399f5aa9bf09eb9 Mon Sep 17 00:00:00 2001 From: Socrates Date: Sun, 17 Aug 2025 21:50:28 +0800 Subject: [PATCH 13/13] fix --- .../apache/doris/datasource/tvf/source/MetadataScanNode.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java index 790617ace18b09..1c12a671011143 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java @@ -69,11 +69,11 @@ protected void toThrift(TPlanNode planNode) { @Override protected void createScanRangeLocations() { - List requiredFileds = desc.getSlots().stream() + List requiredFields = desc.getSlots().stream() .filter(slot -> slot.isMaterialized()) .map(slot -> slot.getColumn().getName()) .collect(java.util.stream.Collectors.toList()); - TMetaScanRange metaScanRange = tvf.getMetaScanRange(requiredFileds); + TMetaScanRange metaScanRange = tvf.getMetaScanRange(requiredFields); if (!metaScanRange.isSetSerializedSplits()) { // no need to split ranges to send to backends