diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp index 55ee05066755c2..ffcae20df9dce2 100644 --- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp @@ -27,8 +27,8 @@ static const std::string HADOOP_OPTION_PREFIX = "hadoop."; IcebergSysTableJniReader::IcebergSysTableJniReader( const std::vector& file_slot_descs, RuntimeState* state, - RuntimeProfile* profile, const TIcebergMetadataParams& range_params) - : JniReader(file_slot_descs, state, profile), _range_params(range_params) {} + RuntimeProfile* profile, const TMetaScanRange& meta_scan_range) + : JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) {} Status IcebergSysTableJniReader::init_reader( const std::unordered_map* colname_to_value_range) { @@ -39,11 +39,12 @@ Status IcebergSysTableJniReader::init_reader( required_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type())); } std::map params; - params["serialized_task"] = _range_params.serialized_task; + // "," is not in base64 + params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ","); params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); - params["time_zone"] = _state->timezone_obj().name(); - for (const auto& kv : _range_params.hadoop_props) { + params["time_zone"] = _state->timezone(); + for (const auto& kv : _meta_scan_range.hadoop_props) { params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; } _jni_connector = diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h index ed867e46abe1a0..982f4357343f58 100644 --- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h +++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h @@ -47,7 +47,7 @@ class IcebergSysTableJniReader : public JniReader { public: IcebergSysTableJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, - const TIcebergMetadataParams& range_params); + const TMetaScanRange& meta_scan_range); ~IcebergSysTableJniReader() override = default; @@ -55,7 +55,7 @@ class IcebergSysTableJniReader : public JniReader { const std::unordered_map* colname_to_value_range); private: - const TIcebergMetadataParams& _range_params; + const TMetaScanRange& _meta_scan_range; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index b691771dafb011..0ffbe79a94a64e 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -49,16 +49,8 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d column_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type())); } std::map params; - params["db_name"] = range.table_format_params.paimon_params.db_name; - params["table_name"] = range.table_format_params.paimon_params.table_name; params["paimon_split"] = range.table_format_params.paimon_params.paimon_split; - params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names; params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate; - params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id); - params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id); - params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id); - params["last_update_time"] = - std::to_string(range.table_format_params.paimon_params.last_update_time); params["required_fields"] = join(column_names, ","); params["columns_types"] = join(column_types, "#"); params["time_zone"] = _state->timezone(); diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp index e6b0263ed25053..6e9c7f50c7e1c3 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp @@ -24,12 +24,11 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop."; -const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon."; PaimonSysTableJniReader::PaimonSysTableJniReader( const std::vector& file_slot_descs, RuntimeState* state, - RuntimeProfile* profile, const TPaimonMetadataParams& range_params) - : JniReader(file_slot_descs, state, profile), _range_params(range_params) { + RuntimeProfile* profile, const TMetaScanRange& meta_scan_range) + : JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) { std::vector required_fields; std::vector required_types; for (const auto& desc : _file_slot_descs) { @@ -38,21 +37,13 @@ PaimonSysTableJniReader::PaimonSysTableJniReader( } std::map params; - params["db_name"] = _range_params.db_name; - params["tbl_name"] = _range_params.tbl_name; - params["query_type"] = _range_params.query_type; - params["ctl_id"] = std::to_string(_range_params.ctl_id); - params["db_id"] = std::to_string(_range_params.db_id); - params["tbl_id"] = std::to_string(_range_params.tbl_id); - params["serialized_split"] = _range_params.serialized_split; + params["serialized_table"] = _meta_scan_range.serialized_table; + // "," is not in base64 + params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ","); params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); - - for (const auto& kv : _range_params.paimon_props) { - params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; - } - - for (const auto& kv : _range_params.hadoop_props) { + params["time_zone"] = _state->timezone(); + for (const auto& kv : _meta_scan_range.hadoop_props) { params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; } diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h index f7b3108f5afd6a..a6f43899e2db96 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h @@ -46,10 +46,9 @@ class PaimonSysTableJniReader : public JniReader { public: static const std::string HADOOP_OPTION_PREFIX; - static const std::string PAIMON_OPTION_PREFIX; PaimonSysTableJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, - const TPaimonMetadataParams& range_params); + const TMetaScanRange& meta_scan_range); ~PaimonSysTableJniReader() override = default; @@ -58,7 +57,7 @@ class PaimonSysTableJniReader : public JniReader { private: const std::unordered_map* _colname_to_value_range; - const TPaimonMetadataParams& _range_params; + const TMetaScanRange& _meta_scan_range; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/scan/meta_scanner.cpp b/be/src/vec/exec/scan/meta_scanner.cpp index d64806b5e0879b..4e95c13f2f030a 100644 --- a/be/src/vec/exec/scan/meta_scanner.cpp +++ b/be/src/vec/exec/scan/meta_scanner.cpp @@ -67,14 +67,14 @@ Status MetaScanner::open(RuntimeState* state) { RETURN_IF_ERROR(Scanner::open(state)); if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) { // TODO: refactor this code - auto reader = IcebergSysTableJniReader::create_unique( - _tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.iceberg_params); + auto reader = IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, + _scan_range.meta_scan_range); const std::unordered_map colname_to_value_range; RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); _reader = std::move(reader); } else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PAIMON) { - auto reader = PaimonSysTableJniReader::create_unique( - _tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.paimon_params); + auto reader = PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile, + _scan_range.meta_scan_range); const std::unordered_map colname_to_value_range; RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); _reader = std::move(reader); @@ -242,9 +242,6 @@ Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { VLOG_CRITICAL << "MetaScanner::_fetch_metadata"; TFetchSchemaTableDataRequest request; switch (meta_scan_range.metadata_type) { - case TMetadataType::ICEBERG: - RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, &request)); - break; case TMetadataType::HUDI: RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, &request)); break; @@ -310,26 +307,6 @@ Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { return Status::OK(); } -Status MetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, - TFetchSchemaTableDataRequest* request) { - VLOG_CRITICAL << "MetaScanner::_build_iceberg_metadata_request"; - if (!meta_scan_range.__isset.iceberg_params) { - return Status::InternalError("Can not find TIcebergMetadataParams from meta_scan_range."); - } - - // create request - request->__set_cluster_name(""); - request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); - - // create TMetadataTableRequestParams - TMetadataTableRequestParams metadata_table_params; - metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG); - metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params); - - request->__set_metada_table_params(metadata_table_params); - return Status::OK(); -} - Status MetaScanner::_build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "MetaScanner::_build_hudi_metadata_request"; diff --git a/be/src/vec/exec/scan/meta_scanner.h b/be/src/vec/exec/scan/meta_scanner.h index f9672ef7567e1d..dd8320b4dfd691 100644 --- a/be/src/vec/exec/scan/meta_scanner.h +++ b/be/src/vec/exec/scan/meta_scanner.h @@ -64,8 +64,6 @@ class MetaScanner : public Scanner { private: Status _fill_block_with_remote_data(const std::vector& columns); Status _fetch_metadata(const TMetaScanRange& meta_scan_range); - Status _build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, - TFetchSchemaTableDataRequest* request); Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); Status _build_backends_metadata_request(const TMetaScanRange& meta_scan_range, diff --git a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java index 350d0d872c3941..9603ed35504a6c 100644 --- a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java +++ b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java @@ -23,6 +23,7 @@ import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; +import com.google.common.base.Preconditions; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.StructLike; import org.apache.iceberg.io.CloseableIterator; @@ -34,6 +35,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -47,16 +50,21 @@ public class IcebergSysTableJniScanner extends JniScanner { private static final String HADOOP_OPTION_PREFIX = "hadoop."; private final ClassLoader classLoader; private final PreExecutionAuthenticator preExecutionAuthenticator; - private final FileScanTask scanTask; + private final Iterator scanTasks; private final List fields; private final String timezone; private CloseableIterator reader; public IcebergSysTableJniScanner(int batchSize, Map params) { this.classLoader = this.getClass().getClassLoader(); - this.scanTask = SerializationUtil.deserializeFromBase64(params.get("serialized_task")); + List scanTasks = Arrays.stream(params.get("serialized_splits").split(",")) + .map(SerializationUtil::deserializeFromBase64) + .map(obj -> (FileScanTask) obj) + .collect(Collectors.toList()); + Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not be empty"); + this.scanTasks = scanTasks.iterator(); String[] requiredFields = params.get("required_fields").split(","); - this.fields = selectSchema(scanTask.schema().asStruct(), requiredFields); + this.fields = selectSchema(scanTasks.get(0).schema().asStruct(), requiredFields); this.timezone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); Map hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) @@ -69,8 +77,14 @@ public IcebergSysTableJniScanner(int batchSize, Map params) { @Override public void open() throws IOException { + Thread.currentThread().setContextClassLoader(classLoader); + nextScanTask(); + } + + private void nextScanTask() throws IOException { + Preconditions.checkArgument(scanTasks.hasNext()); + FileScanTask scanTask = scanTasks.next(); try { - Thread.currentThread().setContextClassLoader(classLoader); preExecutionAuthenticator.execute(() -> { // execute FileScanTask to get rows reader = scanTask.asDataTask().rows().iterator(); @@ -78,7 +92,7 @@ public void open() throws IOException { }); } catch (Exception e) { this.close(); - String msg = String.format("Failed to open IcebergMetadataJniScanner"); + String msg = String.format("Failed to open next scan task: %s", scanTask); LOG.error(msg, e); throw new IOException(msg, e); } @@ -86,11 +100,11 @@ public void open() throws IOException { @Override protected int getNext() throws IOException { - if (reader == null) { - return 0; - } int rows = 0; - while (reader.hasNext() && rows < getBatchSize()) { + while ((reader.hasNext() || scanTasks.hasNext()) && rows < getBatchSize()) { + if (!reader.hasNext()) { + nextScanTask(); + } StructLike row = reader.next(); for (int i = 0; i < fields.size(); i++) { NestedField field = fields.get(i); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 7565d028ee7a4a..4dc1fe90a337ae 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -44,13 +44,9 @@ public class PaimonJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class); - @Deprecated - private static final String PAIMON_OPTION_PREFIX = "paimon."; - @Deprecated private static final String HADOOP_OPTION_PREFIX = "hadoop."; private final Map params; - @Deprecated private final Map hadoopOptionParams; private final String paimonSplit; private final String paimonPredicate; @@ -59,7 +55,6 @@ public class PaimonJniScanner extends JniScanner { private final PaimonColumnValue columnValue = new PaimonColumnValue(); private List paimonAllFieldNames; private List paimonDataTypeList; - private final String timeZone; private RecordReader.RecordIterator recordIterator = null; private final ClassLoader classLoader; private PreExecutionAuthenticator preExecutionAuthenticator; @@ -78,7 +73,7 @@ public PaimonJniScanner(int batchSize, Map params) { } paimonSplit = params.get("paimon_split"); paimonPredicate = params.get("paimon_predicate"); - this.timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); + String timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); columnValue.setTimeZone(timeZone); initTableInfo(columnTypes, requiredFields, batchSize); hadoopOptionParams = params.entrySet().stream() diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java index 0344c8b2db4194..79289bb8256cef 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java @@ -21,9 +21,8 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; -import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; -import org.apache.doris.paimon.PaimonTableCache.TableExt; +import com.google.common.base.Preconditions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.Table; @@ -36,39 +35,32 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TimeZone; import java.util.stream.Collectors; /** * JNI-based scanner for reading Apache Paimon system tables + * TODO: unify this with PaimonJniScanner in future */ public class PaimonSysTableJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonSysTableJniScanner.class); private static final String HADOOP_OPTION_PREFIX = "hadoop."; - private static final String PAIMON_OPTION_PREFIX = "paimon."; - - private final Map params; - private final Map hadoopOptionParams; - private final Map paimonOptionParams; private final ClassLoader classLoader; - private final Split paimonSplit; - private Table table; - private RecordReader reader; + private final Table table; + private final Iterator paimonSplits; private final PaimonColumnValue columnValue = new PaimonColumnValue(); - private List paimonDataTypeList; - private List paimonAllFieldNames; + private final List paimonAllFieldNames; + private final int[] projected; + private final List paimonDataTypeList; private final PreExecutionAuthenticator preExecutionAuthenticator; - private RecordReader.RecordIterator recordIterator = null; - private final long ctlId; - private final long dbId; - private final long tblId; - private final String dbName; - private final String tblName; - private final String queryType; + private RecordReader reader; + private RecordReader.RecordIterator recordIterator; /** * Constructs a new PaimonSysTableJniScanner for reading Paimon system tables. @@ -78,52 +70,60 @@ public PaimonSysTableJniScanner(int batchSize, Map params) { if (LOG.isDebugEnabled()) { LOG.debug("params:{}", params); } - this.params = params; String[] requiredFields = params.get("required_fields").split(","); String[] requiredTypes = params.get("required_types").split("#"); ColumnType[] columnTypes = new ColumnType[requiredTypes.length]; for (int i = 0; i < requiredTypes.length; i++) { columnTypes[i] = ColumnType.parseType(requiredFields[i], requiredTypes[i]); } + String timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID()); + columnValue.setTimeZone(timeZone); initTableInfo(columnTypes, requiredFields, batchSize); - this.paimonSplit = PaimonUtils.deserialize(params.get("serialized_split")); - this.ctlId = Long.parseLong(params.get("ctl_id")); - this.dbId = Long.parseLong(params.get("db_id")); - this.tblId = Long.parseLong(params.get("tbl_id")); - this.dbName = params.get("db_name"); - this.tblName = params.get("tbl_name"); - this.queryType = params.get("query_type"); - this.hadoopOptionParams = params.entrySet().stream() + this.table = PaimonUtils.deserialize(params.get("serialized_table")); + this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); + if (LOG.isDebugEnabled()) { + LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames); + } + resetDatetimeV2Precision(); + this.projected = Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray(); + this.paimonDataTypeList = Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)) + .collect(Collectors.toList()); + this.paimonSplits = Arrays.stream(params.get("serialized_splits").split(",")) + .map(PaimonUtils::deserialize).map(obj -> (Split) obj) + .collect(Collectors.toList()).iterator(); + Map hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) - .collect(Collectors - .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), - Entry::getValue)); - this.paimonOptionParams = params.entrySet().stream() - .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) - .collect(Collectors - .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), - Entry::getValue)); + .collect(Collectors.toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), + Entry::getValue)); this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); } @Override - public void open() { + public void open() throws IOException { + Thread.currentThread().setContextClassLoader(classLoader); + if (!paimonSplits.hasNext()) { + throw new IOException("Failed to open PaimonSysTableJniScanner: No valid splits found"); + } + nextReader(); + } + + private void nextReader() throws IOException { + Preconditions.checkArgument(paimonSplits.hasNext(), "No more splits available"); + Split paimonSplit = paimonSplits.next(); + ReadBuilder readBuilder = table.newReadBuilder(); + readBuilder.withProjection(projected); try { - // When the user does not specify hive-site.xml, Paimon will look for the file from the classpath: - // org.apache.paimon.hive.HiveCatalog.createHiveConf: - // `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)` - // so we need to provide a classloader, otherwise it will cause NPE. - Thread.currentThread().setContextClassLoader(classLoader); preExecutionAuthenticator.execute(() -> { - initTable(); - initReader(); + reader = readBuilder.newRead().executeFilter().createReader(paimonSplit); + Preconditions.checkArgument(recordIterator == null); + recordIterator = reader.readBatch(); return null; }); - resetDatetimeV2Precision(); - - } catch (Throwable e) { - LOG.warn("Failed to open paimon_scanner: {}", e.getMessage(), e); - throw new RuntimeException(e); + } catch (Exception e) { + this.close(); + String msg = String.format("Failed to open next paimonSplit: %s", paimonSplits); + LOG.error(msg, e); + throw new IOException(msg, e); } } @@ -139,48 +139,10 @@ protected int getNext() throws IOException { try { return preExecutionAuthenticator.execute(this::readAndProcessNextBatch); } catch (Exception e) { - throw new RuntimeException(e); + throw new IOException("Failed to getNext in PaimonSysTableJniScanner", e); } } - private void initTable() { - PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, - paimonOptionParams, hadoopOptionParams, dbName, tblName, queryType); - TableExt tableExt = PaimonTableCache.getTable(key); - Table paimonTable = tableExt.getTable(); - if (paimonTable == null) { - throw new RuntimeException( - String.format( - "Failed to get Paimon system table {%s}.{%s}${%s}. ", - dbName, tblName, queryType)); - } - this.table = paimonTable; - this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); - if (LOG.isDebugEnabled()) { - LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames); - } - } - - private void initReader() throws IOException { - ReadBuilder readBuilder = table.newReadBuilder(); - if (this.fields.length > this.paimonAllFieldNames.size()) { - throw new IOException( - String.format( - "The jni reader fields' size {%s} is not matched with paimon fields' size {%s}." - + " Please refresh table and try again", - fields.length, paimonAllFieldNames.size())); - } - int[] projected = getProjected(); - readBuilder.withProjection(projected); - reader = readBuilder.newRead().executeFilter().createReader(paimonSplit); - paimonDataTypeList = - Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); - } - - private int[] getProjected() { - return Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray(); - } - private void resetDatetimeV2Precision() { for (int i = 0; i < types.length; i++) { if (types[i].isDateTimeV2()) { @@ -199,35 +161,27 @@ private void resetDatetimeV2Precision() { private int readAndProcessNextBatch() throws IOException { int rows = 0; - try { - if (recordIterator == null) { - recordIterator = reader.readBatch(); - } - - while (recordIterator != null) { - InternalRow record; - while ((record = recordIterator.next()) != null) { - columnValue.setOffsetRow(record); - for (int i = 0; i < fields.length; i++) { - columnValue.setIdx(i, types[i], paimonDataTypeList.get(i)); - long l = System.nanoTime(); - appendData(i, columnValue); - appendDataTime += System.nanoTime() - l; - } - rows++; - if (rows >= batchSize) { - return rows; - } + while (recordIterator != null) { + InternalRow record; + while ((record = recordIterator.next()) != null) { + columnValue.setOffsetRow(record); + for (int i = 0; i < fields.length; i++) { + columnValue.setIdx(i, types[i], paimonDataTypeList.get(i)); + long l = System.nanoTime(); + appendData(i, columnValue); + appendDataTime += System.nanoTime() - l; + } + rows++; + if (rows >= batchSize) { + return rows; } - recordIterator.releaseBatch(); - recordIterator = reader.readBatch(); } - } catch (Exception e) { - close(); - LOG.warn("Failed to get the next batch of paimon. " - + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", - paimonSplit, params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e); - throw new IOException(e); + recordIterator.releaseBatch(); + recordIterator = reader.readBatch(); + if (recordIterator == null && paimonSplits.hasNext()) { + // try to get next reader + nextReader(); + } } return rows; } diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java deleted file mode 100644 index e5f067af96b728..00000000000000 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java +++ /dev/null @@ -1,221 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.paimon; - -import com.google.common.base.Objects; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import org.apache.hadoop.conf.Configuration; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -public class PaimonTableCache { - // Max cache num of paimon table - public static final long max_external_schema_cache_num = 50; - // The expiration time of a cache object after last access of it. - public static final long external_cache_expire_time_minutes_after_access = 100; - - private static final Logger LOG = LoggerFactory.getLogger(PaimonTableCache.class); - - private static LoadingCache tableCache = CacheBuilder.newBuilder() - .maximumSize(max_external_schema_cache_num) - .expireAfterAccess(external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) - .build(new CacheLoader() { - @Override - public TableExt load(PaimonTableCacheKey key) { - return loadTable(key); - } - }); - - private static TableExt loadTable(PaimonTableCacheKey key) { - try { - LOG.warn("load table:{}", key); - Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams()); - Table table; - if (key.getQueryType() != null) { - table = catalog.getTable(new Identifier(key.getDbName(), key.getTblName(), - null, key.getQueryType())); - } else { - table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); - } - return new TableExt(table, System.currentTimeMillis()); - } catch (Catalog.TableNotExistException e) { - LOG.warn("failed to create paimon table ", e); - throw new RuntimeException(e); - } - } - - private static Catalog createCatalog( - Map paimonOptionParams, - Map hadoopOptionParams) { - Options options = new Options(); - paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue())); - Configuration hadoopConf = new Configuration(); - hadoopOptionParams.entrySet().stream().forEach(kv -> hadoopConf.set(kv.getKey(), kv.getValue())); - CatalogContext context = CatalogContext.create(options, hadoopConf); - return CatalogFactory.createCatalog(context); - } - - public static void invalidateTableCache(PaimonTableCacheKey key) { - tableCache.invalidate(key); - } - - public static TableExt getTable(PaimonTableCacheKey key) { - try { - return tableCache.get(key); - } catch (ExecutionException e) { - throw new RuntimeException("failed to get table for:" + key); - } - } - - public static class TableExt { - private Table table; - private long createTime; - - public TableExt(Table table, long createTime) { - this.table = table; - this.createTime = createTime; - } - - public Table getTable() { - return table; - } - - public long getCreateTime() { - return createTime; - } - } - - public static class PaimonTableCacheKey { - // in key - private final long ctlId; - private final long dbId; - private final long tblId; - - // not in key - private Map paimonOptionParams; - private Map hadoopOptionParams; - private String dbName; - private String tblName; - private String queryType; - - public PaimonTableCacheKey(long ctlId, long dbId, long tblId, - Map paimonOptionParams, - Map hadoopOptionParams, - String dbName, String tblName) { - this.ctlId = ctlId; - this.dbId = dbId; - this.tblId = tblId; - this.paimonOptionParams = paimonOptionParams; - this.hadoopOptionParams = hadoopOptionParams; - this.dbName = dbName; - this.tblName = tblName; - } - - public PaimonTableCacheKey(long ctlId, long dbId, long tblId, - Map paimonOptionParams, - Map hadoopOptionParams, - String dbName, String tblName, String queryType) { - this.ctlId = ctlId; - this.dbId = dbId; - this.tblId = tblId; - this.paimonOptionParams = paimonOptionParams; - this.hadoopOptionParams = hadoopOptionParams; - this.dbName = dbName; - this.tblName = tblName; - this.queryType = queryType; - } - - public long getCtlId() { - return ctlId; - } - - public long getDbId() { - return dbId; - } - - public long getTblId() { - return tblId; - } - - public Map getPaimonOptionParams() { - return paimonOptionParams; - } - - public Map getHadoopOptionParams() { - return hadoopOptionParams; - } - - public String getDbName() { - return dbName; - } - - public String getTblName() { - return tblName; - } - - public String getQueryType() { - return queryType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PaimonTableCacheKey that = (PaimonTableCacheKey) o; - return ctlId == that.ctlId && dbId == that.dbId && tblId == that.tblId && Objects.equal( - queryType, - that.queryType); - } - - @Override - public int hashCode() { - return Objects.hashCode(ctlId, dbId, tblId); - } - - @Override - public String toString() { - return "PaimonTableCacheKey{" - + "ctlId=" + ctlId - + ", dbId=" + dbId - + ", tblId=" + tblId - + ", paimonOptionParams=" + paimonOptionParams - + ", hadoopOptionParams=" + hadoopOptionParams - + ", dbName='" + dbName + '\'' - + ", tblName='" + tblName + '\'' - + ", queryType='" + queryType + '\'' - + '}'; - } - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 2a3df6a28776ab..a385d408b390dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -83,7 +83,6 @@ public class PaimonScanNode extends FileQueryScanNode { private static final String DORIS_START_TIMESTAMP = "startTimestamp"; private static final String DORIS_END_TIMESTAMP = "endTimestamp"; private static final String DORIS_INCREMENTAL_BETWEEN_SCAN_MODE = "incrementalBetweenScanMode"; - private static final String DEFAULT_INCREMENTAL_BETWEEN_SCAN_MODE = "auto"; private enum SplitReadType { JNI, @@ -213,15 +212,6 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) } fileDesc.setFileFormat(fileFormat); fileDesc.setPaimonPredicate(PaimonUtil.encodeObjectToString(predicates)); - fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName()) - .collect(Collectors.joining(","))); - fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName()); - fileDesc.setPaimonOptions(((PaimonExternalCatalog) source.getCatalog()).getPaimonOptionsMap()); - fileDesc.setTableName(source.getTargetTable().getName()); - fileDesc.setCtlId(source.getCatalog().getId()); - fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); - fileDesc.setTblId(source.getTargetTable().getId()); - fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); // The hadoop conf should be same with // PaimonExternalCatalog.createCatalog()#getConfiguration() fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getBackendStorageProperties()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java index aff5dac01e35ca..1c12a671011143 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java @@ -69,24 +69,58 @@ protected void toThrift(TPlanNode planNode) { @Override protected void createScanRangeLocations() { - List requiredFileds = desc.getSlots().stream() + List requiredFields = desc.getSlots().stream() .filter(slot -> slot.isMaterialized()) .map(slot -> slot.getColumn().getName()) .collect(java.util.stream.Collectors.toList()); - for (TMetaScanRange metaScanRange : tvf.getMetaScanRanges(requiredFileds)) { - TScanRange scanRange = new TScanRange(); - scanRange.setMetaScanRange(metaScanRange); + TMetaScanRange metaScanRange = tvf.getMetaScanRange(requiredFields); + if (!metaScanRange.isSetSerializedSplits()) { + // no need to split ranges to send to backends TScanRangeLocation location = new TScanRangeLocation(); Backend backend = backendPolicy.getNextBe(); location.setBackendId(backend.getId()); location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + TScanRange scanRange = new TScanRange(); + scanRange.setMetaScanRange(metaScanRange); + TScanRangeLocations locations = new TScanRangeLocations(); locations.addToLocations(location); locations.setScanRange(scanRange); scanRangeLocations.add(locations); + } else { + // need to split ranges to send to backends + List backends = Lists.newArrayList(backendPolicy.getBackends()); + List splits = metaScanRange.getSerializedSplits(); + int numSplitsPerBE = Math.max(1, splits.size() / backends.size()); + + for (int i = 0; i < backends.size(); i++) { + int from = i * numSplitsPerBE; + if (from >= splits.size()) { + continue; // no splits for this backend + } + int to = Math.min((i + 1) * numSplitsPerBE, splits.size()); + + // set splited task to TMetaScanRange + TMetaScanRange subRange = metaScanRange.deepCopy(); + subRange.setSerializedSplits(splits.subList(from, to)); + + TScanRangeLocation location = new TScanRangeLocation(); + Backend backend = backends.get(i); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + + TScanRange scanRange = new TScanRange(); + scanRange.setMetaScanRange(subRange); + + TScanRangeLocations locations = new TScanRangeLocations(); + locations.addToLocations(location); + locations.setScanRange(scanRange); + + scanRangeLocations.add(locations); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java index 826e1e7c59ded2..aca6779fc64f23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java @@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -104,13 +103,13 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.BACKENDS); TBackendsMetadataParams backendsMetadataParams = new TBackendsMetadataParams(); backendsMetadataParams.setClusterName(""); metaScanRange.setBackendsParams(backendsMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java index 75df6bb62210bc..35719f3fa52ebe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -79,9 +78,9 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.CATALOGS); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java index d1b176f35a1fd5..31ddf55313a809 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java @@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -88,13 +87,13 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.FRONTENDS_DISKS); TFrontendsMetadataParams frontendsMetadataParams = new TFrontendsMetadataParams(); frontendsMetadataParams.setClusterName(""); metaScanRange.setFrontendsParams(frontendsMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java index 5acf44f4d3bd10..efb90070e65fe3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java @@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; @@ -97,13 +96,13 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.FRONTENDS); TFrontendsMetadataParams frontendsMetadataParams = new TFrontendsMetadataParams(); frontendsMetadataParams.setClusterName(""); metaScanRange.setFrontendsParams(frontendsMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java index 70e1ec84928905..2125d420fa5840 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java @@ -120,7 +120,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.HUDI); // set hudi metadata params @@ -130,7 +130,7 @@ public List getMetaScanRanges(List requiredFileds) { hudiMetadataParams.setDatabase(hudiTableName.getDb()); hudiMetadataParams.setTable(hudiTableName.getTbl()); metaScanRange.setHudiParams(hudiMetadataParams); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java index d9ccc392084dbf..359ad2ce57d35e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java @@ -31,12 +31,10 @@ import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.thrift.TIcebergMetadataParams; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataType; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.iceberg.FileScanTask; @@ -48,6 +46,8 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * The class of table valued function for iceberg metadata. @@ -135,8 +135,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { - List scanRanges = Lists.newArrayList(); + public TMetaScanRange getMetaScanRange(List requiredFileds) { CloseableIterable tasks; try { tasks = preExecutionAuthenticator.execute(() -> { @@ -145,17 +144,15 @@ public List getMetaScanRanges(List requiredFileds) { } catch (Exception e) { throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e)); } - for (FileScanTask task : tasks) { - TMetaScanRange metaScanRange = new TMetaScanRange(); - metaScanRange.setMetadataType(TMetadataType.ICEBERG); - // set iceberg metadata params - TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams(); - icebergMetadataParams.setHadoopProps(hadoopProps); - icebergMetadataParams.setSerializedTask(SerializationUtil.serializeToBase64(task)); - metaScanRange.setIcebergParams(icebergMetadataParams); - scanRanges.add(metaScanRange); - } - return scanRanges; + + TMetaScanRange tMetaScanRange = new TMetaScanRange(); + tMetaScanRange.setMetadataType(TMetadataType.ICEBERG); + tMetaScanRange.setHadoopProps(hadoopProps); + tMetaScanRange.setSerializedTable(SerializationUtil.serializeToBase64(sysTable)); + tMetaScanRange.setSerializedSplits(StreamSupport.stream(tasks.spliterator(), false) + .map(SerializationUtil::serializeToBase64) + .collect(Collectors.toList())); + return tMetaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java index b5d0489d30c0b4..d9c5ec5ed2b4ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java @@ -31,7 +31,6 @@ import org.apache.doris.thrift.TMetadataType; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; @@ -99,14 +98,14 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.JOBS); TJobsMetadataParams jobParam = new TJobsMetadataParams(); jobParam.setType(jobType.name()); jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift()); metaScanRange.setJobsParams(jobParam); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index 92b30b0347ba47..32be139657c3e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -60,7 +60,7 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co public abstract TMetadataType getMetadataType(); - public abstract List getMetaScanRanges(List requiredFileds); + public abstract TMetaScanRange getMetaScanRange(List requiredFileds); @Override public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java index c40a8d4716d1ed..e0c42165530a2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java @@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -106,7 +105,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() start"); } @@ -119,7 +118,7 @@ public List getMetaScanRanges(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() end"); } - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java index 91c131603b66de..b9a271bb13ecce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java @@ -34,7 +34,6 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TMetadataType; -import org.apache.doris.thrift.TPaimonMetadataParams; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -60,12 +59,7 @@ public class PaimonTableValuedFunction extends MetadataTableValuedFunction { private final Table paimonSysTable; private final List schema; private final Map hadoopProps; - private final Map paimonProps; private final ExecutionAuthenticator hadoopAuthenticator; - private final TableName paimonTableName; - private final long ctlId; - private final long dbId; - private final long tblId; /** * Creates a new Paimon table-valued function instance. @@ -84,18 +78,14 @@ public PaimonTableValuedFunction(TableName paimonTableName, String queryType) th throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an paimon catalog"); } - this.paimonTableName = paimonTableName; PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog) dorisCatalog; this.hadoopProps = paimonExternalCatalog.getCatalogProperty().getHadoopProperties(); - this.paimonProps = paimonExternalCatalog.getPaimonOptionsMap(); this.hadoopAuthenticator = paimonExternalCatalog.getExecutionAuthenticator(); - this.ctlId = paimonExternalCatalog.getId(); ExternalDatabase database = paimonExternalCatalog.getDb(paimonTableName.getDb()) .orElseThrow(() -> new AnalysisException( String.format("Paimon catalog database '%s' does not exist", paimonTableName.getDb()) )); - this.dbId = database.getId(); ExternalTable externalTable = database.getTable(paimonTableName.getTbl()) .orElseThrow(() -> new AnalysisException( @@ -103,7 +93,6 @@ public PaimonTableValuedFunction(TableName paimonTableName, String queryType) th paimonTableName.getDb(), paimonTableName.getTbl()) )); NameMapping buildNameMapping = externalTable.getOrBuildNameMapping(); - this.tblId = externalTable.getId(); this.paimonSysTable = paimonExternalCatalog.getPaimonSystemTable(buildNameMapping, queryType); @@ -148,7 +137,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { int[] projections = requiredFileds.stream().mapToInt( field -> paimonSysTable.rowType().getFieldNames() .stream() @@ -157,7 +146,6 @@ public List getMetaScanRanges(List requiredFileds) { .indexOf(field)) .toArray(); List splits; - try { splits = hadoopAuthenticator.execute( () -> paimonSysTable.newReadBuilder().withProjection(projections).newScan().plan().splits()); @@ -165,7 +153,13 @@ public List getMetaScanRanges(List requiredFileds) { throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e)); } - return splits.stream().map(this::createMetaScanRange).collect(Collectors.toList()); + TMetaScanRange tMetaScanRange = new TMetaScanRange(); + tMetaScanRange.setMetadataType(TMetadataType.PAIMON); + tMetaScanRange.setHadoopProps(hadoopProps); + tMetaScanRange.setSerializedTable(PaimonUtil.encodeObjectToString(paimonSysTable)); + tMetaScanRange.setSerializedSplits( + splits.stream().map(PaimonUtil::encodeObjectToString).collect(Collectors.toList())); + return tMetaScanRange; } @Override @@ -177,23 +171,4 @@ public String getTableName() { public List getTableColumns() throws AnalysisException { return schema; } - - private TMetaScanRange createMetaScanRange(Split split) { - TMetaScanRange tMetaScanRange = new TMetaScanRange(); - tMetaScanRange.setMetadataType(TMetadataType.PAIMON); - - TPaimonMetadataParams tPaimonMetadataParams = new TPaimonMetadataParams(); - tPaimonMetadataParams.setCtlId(ctlId); - tPaimonMetadataParams.setDbId(dbId); - tPaimonMetadataParams.setTblId(tblId); - tPaimonMetadataParams.setQueryType(queryType); - tPaimonMetadataParams.setDbName(paimonTableName.getDb()); - tPaimonMetadataParams.setTblName(paimonTableName.getTbl()); - tPaimonMetadataParams.setHadoopProps(hadoopProps); - tPaimonMetadataParams.setPaimonProps(paimonProps); - tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split)); - - tMetaScanRange.setPaimonParams(tPaimonMetadataParams); - return tMetaScanRange; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java index e0a0d4dc649bc9..494a68edf3af9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java @@ -145,7 +145,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() start"); } @@ -156,7 +156,7 @@ public List getMetaScanRanges(List requiredFileds) { partitionParam.setDatabase(databaseName); partitionParam.setTable(tableName); metaScanRange.setPartitionValuesParams(partitionParam); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java index 3ffb77cdbc6762..53f99c549a3636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java @@ -43,7 +43,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -211,7 +210,7 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() start"); } @@ -225,7 +224,7 @@ public List getMetaScanRanges(List requiredFileds) { if (LOG.isDebugEnabled()) { LOG.debug("getMetaScanRange() end"); } - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java index 5d60adbeacfd45..b343a7dbeed547 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java @@ -31,7 +31,6 @@ import org.apache.doris.thrift.TTasksMetadataParams; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; @@ -99,14 +98,14 @@ public TMetadataType getMetadataType() { } @Override - public List getMetaScanRanges(List requiredFileds) { + public TMetaScanRange getMetaScanRange(List requiredFileds) { TMetaScanRange metaScanRange = new TMetaScanRange(); metaScanRange.setMetadataType(TMetadataType.TASKS); TTasksMetadataParams taskParam = new TTasksMetadataParams(); taskParam.setType(jobType.name()); taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift()); metaScanRange.setTasksParams(taskParam); - return Lists.newArrayList(metaScanRange); + return metaScanRange; } @Override diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 3a10248b9fa005..cf4e88a0906a37 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -537,12 +537,13 @@ struct TDataGenScanRange { } +// deprecated struct TIcebergMetadataParams { 1: optional string serialized_task 2: optional map hadoop_props } - +// deprecated struct TPaimonMetadataParams { 1: optional string db_name 2: optional string tbl_name @@ -612,7 +613,7 @@ struct TMetaCacheStatsParams { struct TMetaScanRange { 1: optional Types.TMetadataType metadata_type - 2: optional TIcebergMetadataParams iceberg_params + 2: optional TIcebergMetadataParams iceberg_params // deprecated 3: optional TBackendsMetadataParams backends_params 4: optional TFrontendsMetadataParams frontends_params 5: optional TQueriesMetadataParams queries_params @@ -623,7 +624,12 @@ struct TMetaScanRange { 10: optional TMetaCacheStatsParams meta_cache_stats_params 11: optional TPartitionValuesMetadataParams partition_values_params 12: optional THudiMetadataParams hudi_params - 13: optional TPaimonMetadataParams paimon_params + 13: optional TPaimonMetadataParams paimon_params // deprecated + + // for quering sys tables for Paimon/Iceberg + 14: optional map hadoop_props + 15: optional string serialized_table; + 16: optional list serialized_splits; } // Specification of an individual data range which is held in its entirety