Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ static const std::string HADOOP_OPTION_PREFIX = "hadoop.";

IcebergSysTableJniReader::IcebergSysTableJniReader(
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TIcebergMetadataParams& range_params)
: JniReader(file_slot_descs, state, profile), _range_params(range_params) {}
RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
: JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) {}

Status IcebergSysTableJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
Expand All @@ -39,11 +39,12 @@ Status IcebergSysTableJniReader::init_reader(
required_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type()));
}
std::map<std::string, std::string> params;
params["serialized_task"] = _range_params.serialized_task;
// "," is not in base64
params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ",");
params["required_fields"] = join(required_fields, ",");
params["required_types"] = join(required_types, "#");
params["time_zone"] = _state->timezone_obj().name();
for (const auto& kv : _range_params.hadoop_props) {
params["time_zone"] = _state->timezone();
for (const auto& kv : _meta_scan_range.hadoop_props) {
params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
}
_jni_connector =
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ class IcebergSysTableJniReader : public JniReader {
public:
IcebergSysTableJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TIcebergMetadataParams& range_params);
const TMetaScanRange& meta_scan_range);

~IcebergSysTableJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

private:
const TIcebergMetadataParams& _range_params;
const TMetaScanRange& _meta_scan_range;
};

#include "common/compile_check_end.h"
Expand Down
8 changes: 0 additions & 8 deletions be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,8 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
column_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type()));
}
std::map<String, String> params;
params["db_name"] = range.table_format_params.paimon_params.db_name;
params["table_name"] = range.table_format_params.paimon_params.table_name;
params["paimon_split"] = range.table_format_params.paimon_params.paimon_split;
params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names;
params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate;
params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id);
params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id);
params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id);
params["last_update_time"] =
std::to_string(range.table_format_params.paimon_params.last_update_time);
params["required_fields"] = join(column_names, ",");
params["columns_types"] = join(column_types, "#");
params["time_zone"] = _state->timezone();
Expand Down
23 changes: 7 additions & 16 deletions be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ namespace doris::vectorized {
#include "common/compile_check_begin.h"

const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon.";

PaimonSysTableJniReader::PaimonSysTableJniReader(
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TPaimonMetadataParams& range_params)
: JniReader(file_slot_descs, state, profile), _range_params(range_params) {
RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
: JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) {
std::vector<std::string> required_fields;
std::vector<std::string> required_types;
for (const auto& desc : _file_slot_descs) {
Expand All @@ -38,21 +37,13 @@ PaimonSysTableJniReader::PaimonSysTableJniReader(
}

std::map<std::string, std::string> params;
params["db_name"] = _range_params.db_name;
params["tbl_name"] = _range_params.tbl_name;
params["query_type"] = _range_params.query_type;
params["ctl_id"] = std::to_string(_range_params.ctl_id);
params["db_id"] = std::to_string(_range_params.db_id);
params["tbl_id"] = std::to_string(_range_params.tbl_id);
params["serialized_split"] = _range_params.serialized_split;
params["serialized_table"] = _meta_scan_range.serialized_table;
// "," is not in base64
params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ",");
params["required_fields"] = join(required_fields, ",");
params["required_types"] = join(required_types, "#");

for (const auto& kv : _range_params.paimon_props) {
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
}

for (const auto& kv : _range_params.hadoop_props) {
params["time_zone"] = _state->timezone();
for (const auto& kv : _meta_scan_range.hadoop_props) {
params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
}

Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ class PaimonSysTableJniReader : public JniReader {

public:
static const std::string HADOOP_OPTION_PREFIX;
static const std::string PAIMON_OPTION_PREFIX;
PaimonSysTableJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TPaimonMetadataParams& range_params);
const TMetaScanRange& meta_scan_range);

~PaimonSysTableJniReader() override = default;

Expand All @@ -58,7 +57,7 @@ class PaimonSysTableJniReader : public JniReader {

private:
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
const TPaimonMetadataParams& _range_params;
const TMetaScanRange& _meta_scan_range;
};

#include "common/compile_check_end.h"
Expand Down
31 changes: 4 additions & 27 deletions be/src/vec/exec/scan/meta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ Status MetaScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(Scanner::open(state));
if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
// TODO: refactor this code
auto reader = IcebergSysTableJniReader::create_unique(
_tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.iceberg_params);
auto reader = IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range);
const std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
_reader = std::move(reader);
} else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PAIMON) {
auto reader = PaimonSysTableJniReader::create_unique(
_tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.paimon_params);
auto reader = PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range);
const std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
_reader = std::move(reader);
Expand Down Expand Up @@ -242,9 +242,6 @@ Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
VLOG_CRITICAL << "MetaScanner::_fetch_metadata";
TFetchSchemaTableDataRequest request;
switch (meta_scan_range.metadata_type) {
case TMetadataType::ICEBERG:
RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::HUDI:
RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, &request));
break;
Expand Down Expand Up @@ -310,26 +307,6 @@ Status MetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
return Status::OK();
}

Status MetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "MetaScanner::_build_iceberg_metadata_request";
if (!meta_scan_range.__isset.iceberg_params) {
return Status::InternalError("Can not find TIcebergMetadataParams from meta_scan_range.");
}

// create request
request->__set_cluster_name("");
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG);
metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status MetaScanner::_build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "MetaScanner::_build_hudi_metadata_request";
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exec/scan/meta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class MetaScanner : public Scanner {
private:
Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>& columns);
Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
Status _build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_backends_metadata_request(const TMetaScanRange& meta_scan_range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;

import com.google.common.base.Preconditions;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableIterator;
Expand All @@ -34,6 +35,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
Expand All @@ -47,16 +50,21 @@ public class IcebergSysTableJniScanner extends JniScanner {
private static final String HADOOP_OPTION_PREFIX = "hadoop.";
private final ClassLoader classLoader;
private final PreExecutionAuthenticator preExecutionAuthenticator;
private final FileScanTask scanTask;
private final Iterator<FileScanTask> scanTasks;
private final List<NestedField> fields;
private final String timezone;
private CloseableIterator<StructLike> reader;

public IcebergSysTableJniScanner(int batchSize, Map<String, String> params) {
this.classLoader = this.getClass().getClassLoader();
this.scanTask = SerializationUtil.deserializeFromBase64(params.get("serialized_task"));
List<FileScanTask> scanTasks = Arrays.stream(params.get("serialized_splits").split(","))
.map(SerializationUtil::deserializeFromBase64)
.map(obj -> (FileScanTask) obj)
.collect(Collectors.toList());
Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not be empty");
this.scanTasks = scanTasks.iterator();
String[] requiredFields = params.get("required_fields").split(",");
this.fields = selectSchema(scanTask.schema().asStruct(), requiredFields);
this.fields = selectSchema(scanTasks.get(0).schema().asStruct(), requiredFields);
this.timezone = params.getOrDefault("time_zone", TimeZone.getDefault().getID());
Map<String, String> hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
Expand All @@ -69,28 +77,34 @@ public IcebergSysTableJniScanner(int batchSize, Map<String, String> params) {

@Override
public void open() throws IOException {
Thread.currentThread().setContextClassLoader(classLoader);
nextScanTask();
}

private void nextScanTask() throws IOException {
Preconditions.checkArgument(scanTasks.hasNext());
FileScanTask scanTask = scanTasks.next();
try {
Thread.currentThread().setContextClassLoader(classLoader);
preExecutionAuthenticator.execute(() -> {
// execute FileScanTask to get rows
reader = scanTask.asDataTask().rows().iterator();
return null;
});
} catch (Exception e) {
this.close();
String msg = String.format("Failed to open IcebergMetadataJniScanner");
String msg = String.format("Failed to open next scan task: %s", scanTask);
LOG.error(msg, e);
throw new IOException(msg, e);
}
}

@Override
protected int getNext() throws IOException {
if (reader == null) {
return 0;
}
int rows = 0;
while (reader.hasNext() && rows < getBatchSize()) {
while ((reader.hasNext() || scanTasks.hasNext()) && rows < getBatchSize()) {
if (!reader.hasNext()) {
nextScanTask();
}
StructLike row = reader.next();
for (int i = 0; i < fields.size(); i++) {
NestedField field = fields.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,9 @@

public class PaimonJniScanner extends JniScanner {
private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class);
@Deprecated
private static final String PAIMON_OPTION_PREFIX = "paimon.";
@Deprecated
private static final String HADOOP_OPTION_PREFIX = "hadoop.";

private final Map<String, String> params;
@Deprecated
private final Map<String, String> hadoopOptionParams;
private final String paimonSplit;
private final String paimonPredicate;
Expand All @@ -59,7 +55,6 @@ public class PaimonJniScanner extends JniScanner {
private final PaimonColumnValue columnValue = new PaimonColumnValue();
private List<String> paimonAllFieldNames;
private List<DataType> paimonDataTypeList;
private final String timeZone;
private RecordReader.RecordIterator<InternalRow> recordIterator = null;
private final ClassLoader classLoader;
private PreExecutionAuthenticator preExecutionAuthenticator;
Expand All @@ -78,7 +73,7 @@ public PaimonJniScanner(int batchSize, Map<String, String> params) {
}
paimonSplit = params.get("paimon_split");
paimonPredicate = params.get("paimon_predicate");
this.timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID());
String timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID());
columnValue.setTimeZone(timeZone);
initTableInfo(columnTypes, requiredFields, batchSize);
hadoopOptionParams = params.entrySet().stream()
Expand Down
Loading
Loading