Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
for (int i = 0; i < _max_scanners; ++i) {
std::unique_ptr<vectorized::FileScanner> scanner = vectorized::FileScanner::create_unique(
state(), this, p._limit, _split_source, _scanner_profile.get(), _kv_cache.get(),
&_colname_to_value_range, &p._colname_to_slot_id);
&p._colname_to_slot_id);
RETURN_IF_ERROR(scanner->init(state(), _conjuncts));
scanners->push_back(std::move(scanner));
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/avro/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ Status AvroJNIReader::get_columns(std::unordered_map<std::string, DataTypePtr>*
return Status::OK();
}

Status AvroJNIReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
Status AvroJNIReader::init_reader() {
std::ostringstream required_fields;
std::ostringstream columns_types;
std::vector<std::string> column_names;
Expand Down Expand Up @@ -97,7 +95,7 @@ Status AvroJNIReader::init_reader(
required_param.insert(std::make_pair("uri", _range.path));
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
required_param, column_names);
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}

Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/avro/avro_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ class AvroJNIReader : public JniReader {
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

TFileType::type get_file_type() const;

Expand All @@ -81,7 +80,6 @@ class AvroJNIReader : public JniReader {
private:
const TFileScanRangeParams _params;
const TFileRangeDesc _range;
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
};

#include "common/compile_check_end.h"
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs
params, column_names);
}

Status MockJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
Status MockJniReader::init_reader() {
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}
#include "common/compile_check_end.h"
Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/exec/format/jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ class MockJniReader : public JniReader {

~MockJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

Status close() override {
if (_jni_connector) {
Expand All @@ -110,9 +109,6 @@ class MockJniReader : public JniReader {
_jni_connector->collect_profile_before_close();
}
}

private:
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
};

#include "common/compile_check_end.h"
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/table/hudi_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
params, required_fields);
}

Status HudiJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
Status HudiJniReader::init_reader() {
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}
#include "common/compile_check_end.h"
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/table/hudi_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,11 @@ class HudiJniReader : public JniReader {

~HudiJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

private:
const TFileScanRangeParams& _scan_params;
const THudiFileDesc& _hudi_params;
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
};

#include "common/compile_check_end.h"
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ IcebergSysTableJniReader::IcebergSysTableJniReader(
RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
: JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) {}

Status IcebergSysTableJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
Status IcebergSysTableJniReader::init_reader() {
std::vector<std::string> required_fields;
std::vector<std::string> required_types;
for (const auto& desc : _file_slot_descs) {
Expand All @@ -53,7 +52,7 @@ Status IcebergSysTableJniReader::init_reader(
if (_jni_connector == nullptr) {
return Status::InternalError("JniConnector failed to initialize");
}
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class IcebergSysTableJniReader : public JniReader {

~IcebergSysTableJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

private:
const TMetaScanRange& _meta_scan_range;
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
params, required_fields);
}

Status LakeSoulJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
Status LakeSoulJniReader::init_reader() {
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}
#include "common/compile_check_end.h"
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/format/table/lakesoul_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class LakeSoulJniReader : public JniReader {

~LakeSoulJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

private:
const TLakeSoulFileDesc& _lakesoul_params;
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
"org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names);
}

Status MaxComputeJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
Status MaxComputeJniReader::init_reader() {
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}
#include "common/compile_check_end.h"
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/table/max_compute_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ class MaxComputeJniReader : public JniReader {

~MaxComputeJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

private:
const MaxComputeTableDescriptor* _table_desc = nullptr;
const TMaxComputeFileDesc& _max_compute_params;
const TFileRangeDesc& _range;
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
};

#include "common/compile_check_end.h"
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,8 @@ Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eo
return _jni_connector->get_next_block(block, read_rows, eof);
}

Status PaimonJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
Status PaimonJniReader::init_reader() {
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}
#include "common/compile_check_end.h"
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/table/paimon_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ class PaimonJniReader : public JniReader {

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

private:
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
int64_t _remaining_table_level_row_count;
};

Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ PaimonSysTableJniReader::PaimonSysTableJniReader(
"org/apache/doris/paimon/PaimonSysTableJniScanner", std::move(params), required_fields);
}

Status PaimonSysTableJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
Status PaimonSysTableJniReader::init_reader() {
RETURN_IF_ERROR(_jni_connector->init());
return _jni_connector->open(_state, _profile);
}

Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ class PaimonSysTableJniReader : public JniReader {

~PaimonSysTableJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

private:
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
const TMetaScanRange& _meta_scan_range;
};

Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ TrinoConnectorJniReader::TrinoConnectorJniReader(
"org/apache/doris/trinoconnector/TrinoConnectorJniScanner", params, column_names);
}

Status TrinoConnectorJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
Status TrinoConnectorJniReader::init_reader() {
RETURN_IF_ERROR(_jni_connector->init());
RETURN_IF_ERROR(_set_spi_plugins_dir());
return _jni_connector->open(_state, _profile);
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/format/table/trino_connector_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class TrinoConnectorJniReader : public JniReader {

~TrinoConnectorJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init_reader();

private:
Status _set_spi_plugins_dir();
Expand Down
25 changes: 1 addition & 24 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
return Status::OK();
}

Status JniConnector::init(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
// TODO: This logic need to be changed.
// See the comment of "predicates" field in JniScanner.java

// _generate_predicates(colname_to_value_range);
// if (_predicates_length != 0 && _predicates != nullptr) {
// int64_t predicates_address = (int64_t)_predicates.get();
// // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
// // serialized predicates in java side.
// _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address));
// }
Status JniConnector::init() {
return Status::OK();
}

Expand Down Expand Up @@ -501,18 +490,6 @@ Status JniConnector::_fill_struct_column(TableMetaAddress& address, MutableColum
return Status::OK();
}

void JniConnector::_generate_predicates(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
if (colname_to_value_range == nullptr) {
return;
}
for (auto& kv : *colname_to_value_range) {
const std::string& column_name = kv.first;
const ColumnValueRangeType& col_val_range = kv.second;
std::visit([&](auto&& range) { _parse_value_range(range, column_name); }, col_val_range);
}
}

std::string JniConnector::get_jni_type(const DataTypePtr& data_type) {
DataTypePtr type = remove_nullable(data_type);
std::ostringstream buffer;
Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/exec/jni_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ class JniConnector : public ProfileCollector {
* number_filters(4) | length(4) | column_name | op(4) | scale(4) | num_values(4) | value_length(4) | value | ...
* Then, pass the byte array address in configuration map, like "push_down_predicates=${address}"
*/
Status init(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
Status init();

/**
* Call java side function JniScanner.getNextBatchMeta. The columns information are stored as long array:
Expand Down Expand Up @@ -364,9 +363,6 @@ class JniConnector : public ProfileCollector {
return (long)assert_cast<const COLUMN_TYPE&>(doris_column).get_data().data();
}

void _generate_predicates(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

template <PrimitiveType primitive_type>
void _parse_value_range(const ColumnValueRange<primitive_type>& col_val_range,
const std::string& column_name) {
Expand Down
Loading
Loading