Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ Status ParallelScannerBuilder::_load() {
std::shared_ptr<NewOlapScanner> ParallelScannerBuilder::_build_scanner(
BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges,
TabletReader::ReadSource&& read_source) {
NewOlapScanner::Params params {
_state, _scanner_profile.get(), key_ranges, std::move(tablet),
version, std::move(read_source), _limit_per_scanner, _is_preaggregation};
NewOlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet),
version, std::move(read_source), _limit, _is_preaggregation};
return NewOlapScanner::create_shared(_parent, std::move(params));
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/parallel_scanner_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ class ParallelScannerBuilder {
const std::vector<TabletWithVersion>& tablets,
const std::shared_ptr<RuntimeProfile>& profile,
const std::vector<OlapScanRange*>& key_ranges, RuntimeState* state,
int64_t limit_per_scanner, bool is_dup_mow_key, bool is_preaggregation)
int64_t limit, bool is_dup_mow_key, bool is_preaggregation)
: _parent(parent),
_scanner_profile(profile),
_state(state),
_limit_per_scanner(limit_per_scanner),
_limit(limit),
_is_dup_mow_key(is_dup_mow_key),
_is_preaggregation(is_preaggregation),
_tablets(tablets.cbegin(), tablets.cend()),
Expand Down Expand Up @@ -85,7 +85,7 @@ class ParallelScannerBuilder {

std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeState* _state;
int64_t _limit_per_scanner;
int64_t _limit;
bool _is_dup_mow_key;
bool _is_preaggregation;
std::vector<TabletWithVersion> _tablets;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/es_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
properties, p._column_names, p._docvalue_context, &doc_value_mode);

std::shared_ptr<vectorized::NewEsScanner> scanner = vectorized::NewEsScanner::create_shared(
RuntimeFilterConsumer::_state, this, p._limit_per_scanner, p._tuple_id, properties,
RuntimeFilterConsumer::_state, this, p._limit, p._tuple_id, properties,
p._docvalue_context, doc_value_mode,
RuntimeFilterConsumer::_state->runtime_profile());

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (int i = 0; i < _max_scanners; ++i) {
std::unique_ptr<vectorized::VFileScanner> scanner = vectorized::VFileScanner::create_unique(
state(), this, p._limit_per_scanner, _split_source, _scanner_profile.get(),
_kv_cache.get());
state(), this, p._limit, _split_source, _scanner_profile.get(), _kv_cache.get());
RETURN_IF_ERROR(
scanner->prepare(_conjuncts, &_colname_to_value_range, &p._colname_to_slot_id));
scanners->push_back(std::move(scanner));
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/jdbc_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ std::string JDBCScanLocalState::name_suffix() const {
Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
auto& p = _parent->cast<JDBCScanOperatorX>();
std::unique_ptr<vectorized::NewJdbcScanner> scanner = vectorized::NewJdbcScanner::create_unique(
state(), this, p._limit_per_scanner, p._tuple_id, p._query_string, p._table_type,
state(), this, p._limit, p._tuple_id, p._query_string, p._table_type,
_scanner_profile.get());
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
scanners->push_back(std::move(scanner));
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/meta_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s

for (auto& scan_range : _scan_ranges) {
std::shared_ptr<vectorized::VMetaScanner> scanner = vectorized::VMetaScanner::create_shared(
state(), this, p._tuple_id, scan_range, p._limit_per_scanner, profile(),
p._user_identity);
state(), this, p._tuple_id, scan_range, p._limit, profile(), p._user_identity);
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
scanners->push_back(scanner);
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}

ParallelScannerBuilder scanner_builder(this, tablets, _scanner_profile, key_ranges, state(),
p._limit_per_scanner, true,
p._olap_scan_node.is_preaggregation);
p._limit, true, p._olap_scan_node.is_preaggregation);

int max_scanners_count = state()->parallel_scan_max_scanners_count();

Expand Down Expand Up @@ -345,7 +344,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
std::move(tablet),
version,
{},
p._limit_per_scanner,
p._limit,
p._olap_scan_node.is_preaggregation,
});
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ ScannerContext::ScannerContext(
: state->query_parallel_instance_num());
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());

// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_local_state && _local_state->should_run_serial()) {
_max_thread_num = 1;
}
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
Expand Down