diff --git a/be/src/common/config.h b/be/src/common/config.h index ae2c83801da0f7..4e2ab0c54f30ec 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -189,6 +189,9 @@ namespace config { CONF_mInt32(doris_scanner_row_num, "16384"); // number of max scan keys CONF_mInt32(doris_max_scan_key_num, "1024"); + // the max number of push down values of a single column. + // if exceed, no conditions will be pushed down for that column. + CONF_mInt32(max_pushdown_conditions_per_column, "1024"); // return_row / total_row CONF_mInt32(doris_max_pushdown_conjuncts_return_rate, "90"); // (Advanced) Maximum size of per-query receive-side buffer diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index 17adb16a7fb290..1a52603fdfed33 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -186,7 +186,7 @@ class OlapScanKeys { _is_convertible(true) {} template - Status extend_scan_key(ColumnValueRange& range); + Status extend_scan_key(ColumnValueRange& range, int32_t max_scan_key_num); Status get_key_range(std::vector>* key_range); @@ -615,7 +615,7 @@ bool ColumnValueRange::has_intersection(ColumnValueRange& range) { } template -Status OlapScanKeys::extend_scan_key(ColumnValueRange& range) { +Status OlapScanKeys::extend_scan_key(ColumnValueRange& range, int32_t max_scan_key_num) { using namespace std; typedef typename set::const_iterator const_iterator_type; @@ -636,8 +636,8 @@ Status OlapScanKeys::extend_scan_key(ColumnValueRange& range) { bool has_converted = false; if (range.is_fixed_value_range()) { - if ((_begin_scan_keys.empty() && range.get_fixed_value_size() > config::doris_max_scan_key_num) - || range.get_fixed_value_size() * _begin_scan_keys.size() > config::doris_max_scan_key_num) { + if ((_begin_scan_keys.empty() && range.get_fixed_value_size() > max_scan_key_num) + || range.get_fixed_value_size() * _begin_scan_keys.size() > max_scan_key_num) { if (range.is_range_value_convertible()) { range.convert_to_range_value(); } else { @@ -647,7 +647,7 @@ Status OlapScanKeys::extend_scan_key(ColumnValueRange& range) { } else { if (range.is_fixed_value_convertible() && _is_convertible) { if (_begin_scan_keys.empty()) { - if (range.get_convertible_fixed_value_size() < config::doris_max_scan_key_num) { + if (range.get_convertible_fixed_value_size() < max_scan_key_num) { if (range.is_low_value_mininum() && range.is_high_value_maximum()) { has_converted = true; } @@ -656,7 +656,7 @@ Status OlapScanKeys::extend_scan_key(ColumnValueRange& range) { } } else { if (range.get_convertible_fixed_value_size() * _begin_scan_keys.size() - < config::doris_max_scan_key_num) { + < max_scan_key_num) { if (range.is_low_value_mininum() && range.is_high_value_maximum()) { has_converted = true; } diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 8535bfec781e7a..db3fe88a1b00ca 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -72,27 +72,23 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); _direct_conjunct_size = _conjunct_ctxs.size(); + const TQueryOptions& query_options = state->query_options(); + if (query_options.__isset.max_scan_key_num) { + _max_scan_key_num = query_options.max_scan_key_num; + } else { + _max_scan_key_num = config::doris_max_scan_key_num; + } + + if (query_options.__isset.max_pushdown_conditions_per_column) { + _max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column; + } else { + _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; + } + return Status::OK(); } void OlapScanNode::_init_counter(RuntimeState* state) { -#if 0 - ADD_TIMER(profile, "GetTabletTime"); - ADD_TIMER(profile, "InitReaderTime"); - ADD_TIMER(profile, "ShowHintsTime"); - ADD_TIMER(profile, "BlockLoadTime"); - ADD_TIMER(profile, "IndexLoadTime"); - ADD_TIMER(profile, "VectorPredicateEvalTime"); - ADD_TIMER(profile, "ScannerTimer"); - ADD_TIMER(profile, "IOTimer"); - ADD_TIMER(profile, "DecompressorTimer"); - ADD_TIMER(profile, "RLETimer"); - - ADD_COUNTER(profile, "RawRowsRead", TUnit::UNIT); - ADD_COUNTER(profile, "IndexStreamCacheMiss", TUnit::UNIT); - ADD_COUNTER(profile, "IndexStreamCacheHit", TUnit::UNIT); - ADD_COUNTER(profile, "BlockLoadCount", TUnit::UNIT); -#endif ADD_TIMER(_runtime_profile, "ShowHintsTime"); _reader_init_timer = ADD_TIMER(_runtime_profile, "ReaderInitTime"); @@ -134,6 +130,8 @@ void OlapScanNode::_init_counter(RuntimeState* state) { _bitmap_index_filter_counter = ADD_COUNTER(_runtime_profile, "BitmapIndexFilterCount", TUnit::UNIT); _bitmap_index_filter_timer = ADD_TIMER(_runtime_profile, "BitmapIndexFilterTimer"); + + _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); } Status OlapScanNode::prepare(RuntimeState* state) { @@ -545,7 +543,7 @@ Status OlapScanNode::build_scan_key() { break; } - ExtendScanKeyVisitor visitor(_scan_keys); + ExtendScanKeyVisitor visitor(_scan_keys, _max_scan_key_num); RETURN_IF_ERROR(boost::apply_visitor(visitor, column_range_iter->second)); } @@ -694,10 +692,11 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges); _scanner_pool->add(scanner); _olap_scanners.push_back(scanner); - disk_set.insert(scanner->scan_disk()); + disk_set.insert(scanner->scan_disk()); } } COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); + COUNTER_SET(_num_scanners, static_cast(_olap_scanners.size())); // init progress std::stringstream ss; @@ -715,10 +714,10 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { template Status OlapScanNode::normalize_predicate(ColumnValueRange& range, SlotDescriptor* slot) { // 1. Normalize InPredicate, add to ColumnValueRange - RETURN_IF_ERROR(normalize_in_predicate(slot, &range)); + RETURN_IF_ERROR(normalize_in_and_eq_predicate(slot, &range)); // 2. Normalize BinaryPredicate , add to ColumnValueRange - RETURN_IF_ERROR(normalize_binary_predicate(slot, &range)); + RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range)); // 3. Add range to Column->ColumnValueRange map _column_value_ranges[slot->col_name()] = range; @@ -736,100 +735,114 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) { return false; } +// Construct the ColumnValueRange for one specified column +// It will only handle the InPredicate and eq BinaryPredicate in _conjunct_ctxs. +// It will try to push down conditions of that column as much as possible, +// But if the number of conditions exceeds the limit, none of conditions will be pushed down. template -Status OlapScanNode::normalize_in_predicate(SlotDescriptor* slot, ColumnValueRange* range) { +Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, ColumnValueRange* range) { + bool meet_eq_binary = false; for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) { InPredicate* pred = dynamic_cast(_conjunct_ctxs[conj_idx]->root()); if (pred->is_not_in()) { + // can not push down NOT IN predicate to storage engine continue; } if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + continue; + } + + std::vector slot_ids; + if (pred->get_child(0)->get_slot_ids(&slot_ids) != 1) { + // not a single column predicate + continue; + } + + if (slot_ids[0] != slot->id()) { + // predicate not related to current column continue; } if (pred->get_child(0)->type().type != slot->type().type) { if (!ignore_cast(slot, pred->get_child(0))) { + // the type of predicate not match the slot's type continue; } } - if (pred->is_not_in()) { + VLOG(1) << slot->col_name() << " fixed_values add num: " + << pred->hybird_set()->size(); + + // if there are too many elements in InPredicate, exceed the limit, + // we will not push any condition of this column to storage engine. + // because too many conditions pushed down to storage engine may even + // slow down the query process. + // ATTN: This is just an experience value. You may need to try + // different thresholds to improve performance. + if (pred->hybird_set()->size() > _max_pushdown_conditions_per_column) { + VLOG(3) << "Predicate value num " << pred->hybird_set()->size() + << " excede limit " << _max_pushdown_conditions_per_column; continue; } - std::vector slot_ids; - if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) { - // 1.1 Skip if slot not match conjunct - if (slot_ids[0] != slot->id()) { - continue; + // begin to push InPredicate value into ColumnValueRange + HybirdSetBase::IteratorBase* iter = pred->hybird_set()->begin(); + while (iter->has_next()) { + // column in (NULL,...) counldn't push down to StorageEngine + // so that discard whole ColumnValueRange + if (NULL == iter->get_value()) { + range->clear(); + break; } - VLOG(1) << slot->col_name() << " fixed_values add num: " - << pred->hybird_set()->size(); - - // 1.2 Skip if InPredicate value size larger then max_scan_key_num - if (pred->hybird_set()->size() > config::doris_max_scan_key_num) { - VLOG(3) << "Predicate value num " << pred->hybird_set()->size() - << " excede limit " << config::doris_max_scan_key_num; - continue; + switch (slot->type().type) { + case TYPE_TINYINT: { + int32_t v = *reinterpret_cast(const_cast(iter->get_value())); + range->add_fixed_value(*reinterpret_cast(&v)); + break; } - - // 1.3 Push InPredicate value into ColumnValueRange - HybirdSetBase::IteratorBase* iter = pred->hybird_set()->begin(); - while (iter->has_next()) { - // column in (NULL,...) counldn't push down to StorageEngine - // so that discard whole ColumnValueRange - if (NULL == iter->get_value()) { - range->clear(); - break; - } - - switch (slot->type().type) { - case TYPE_TINYINT: { - int32_t v = *reinterpret_cast(const_cast(iter->get_value())); - range->add_fixed_value(*reinterpret_cast(&v)); - break; - } - case TYPE_DATE: { - DateTimeValue date_value = - *reinterpret_cast(iter->get_value()); - date_value.cast_to_date(); - range->add_fixed_value(*reinterpret_cast(&date_value)); - break; - } - case TYPE_DECIMAL: - case TYPE_DECIMALV2: - case TYPE_LARGEINT: - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_HLL: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - case TYPE_DATETIME: { - range->add_fixed_value(*reinterpret_cast(const_cast(iter->get_value()))); - break; - } - case TYPE_BOOLEAN: { - bool v = *reinterpret_cast(const_cast(iter->get_value())); - range->add_fixed_value(*reinterpret_cast(&v)); - break; - } - default: { - break; - } - } - iter->next(); + case TYPE_DATE: { + DateTimeValue date_value = + *reinterpret_cast(iter->get_value()); + date_value.cast_to_date(); + range->add_fixed_value(*reinterpret_cast(&date_value)); + break; + } + case TYPE_DECIMAL: + case TYPE_DECIMALV2: + case TYPE_LARGEINT: + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_HLL: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_DATETIME: { + range->add_fixed_value(*reinterpret_cast(const_cast(iter->get_value()))); + break; + } + case TYPE_BOOLEAN: { + bool v = *reinterpret_cast(const_cast(iter->get_value())); + range->add_fixed_value(*reinterpret_cast(&v)); + break; } + default: { + break; + } + } + iter->next(); } - } + + } // end of handle in predicate // 2. Normalize eq conjuncts like 'where col = value' if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() && FILTER_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) { + Expr* pred = _conjunct_ctxs[conj_idx]->root(); DCHECK(pred->get_num_children() == 2); @@ -838,30 +851,42 @@ Status OlapScanNode::normalize_in_predicate(SlotDescriptor* slot, ColumnValueRan != TExprNodeType::SLOT_REF) { continue; } + + std::vector slot_ids; + if (pred->get_child(child_idx)->get_slot_ids(&slot_ids) != 1) { + // not a single column predicate + continue; + } + + if (slot_ids[0] != slot->id()) { + // predicate not related to current column + continue; + } + if (pred->get_child(child_idx)->type().type != slot->type().type) { if (!ignore_cast(slot, pred->get_child(child_idx))) { + // the type of predicate not match the slot's type continue; } } - std::vector slot_ids; - if (1 == pred->get_child(child_idx)->get_slot_ids(&slot_ids)) { - if (slot_ids[0] != slot->id()) { - continue; - } - - Expr* expr = pred->get_child(1 - child_idx); - if (!expr->is_constant()) { - continue; - } + Expr* expr = pred->get_child(1 - child_idx); + if (!expr->is_constant()) { + // only handle constant value + continue; + } - void* value = _conjunct_ctxs[conj_idx]->get_value(expr, NULL); - // for case: where col = null - if (value == NULL) { - continue; - } + void* value = _conjunct_ctxs[conj_idx]->get_value(expr, NULL); + // for case: where col = null + if (value == NULL) { + continue; + } - switch (slot->type().type) { + // begin to push condition value into ColumnValueRange + // clear the ColumnValueRange before adding new fixed values. + // because for AND compound predicates, it can overwrite previous conditions + range->clear(); + switch (slot->type().type) { case TYPE_TINYINT: { int32_t v = *reinterpret_cast(value); range->add_fixed_value(*reinterpret_cast(&v)); @@ -894,15 +919,42 @@ Status OlapScanNode::normalize_in_predicate(SlotDescriptor* slot, ColumnValueRan } default: { LOG(WARNING) << "Normalize filter fail, Unsupport Primitive type. [type=" - << expr->type() << "]"; + << expr->type() << "]"; return Status::InternalError("Normalize filter fail, Unsupport Primitive type"); } - } } + + meet_eq_binary = true; + } // end for each binary predicate child + } // end of handling eq binary predicate + + if (range->get_fixed_value_size() > 0) { + // this columns already meet some eq predicates(IN or Binary), + // There is no need to continue to iterate. + // TODO(cmy): In fact, this part of the judgment should be completed in + // the FE query planning stage. For the following predicate conditions, + // it should be possible to eliminate at the FE side. + // WHERE A = 1 and A in (2,3,4) + + if (meet_eq_binary) { + // meet_eq_binary is true, means we meet at least one eq binary predicate. + // this flag is to handle following case: + // There are 2 conjuncts, first in a InPredicate, and second is a BinaryPredicate. + // Firstly, we met a InPredicate, and add lots of values in ColumnValueRange, + // if breaks, doris will read many rows filtered by these values. + // But if continue to handle the BinaryPredicate, the value in ColumnValueRange + // may become only one, which can reduce the rows read from storage engine. + // So the strategy is to use the BinaryPredicate as much as possible. + break; } - } + } + } + if (range->get_fixed_value_size() > _max_pushdown_conditions_per_column) { + // exceed limit, no conditions will be pushed down to storage engine. + range->clear(); + } return Status::OK(); } @@ -928,7 +980,7 @@ void OlapScanNode::construct_is_null_pred_in_where_pred(Expr* expr, SlotDescript } template -Status OlapScanNode::normalize_binary_predicate(SlotDescriptor* slot, ColumnValueRange* range) { +Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange* range) { for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { Expr *root_expr = _conjunct_ctxs[conj_idx]->root(); if (TExprNodeType::BINARY_PRED != root_expr->node_type() @@ -996,7 +1048,7 @@ Status OlapScanNode::normalize_binary_predicate(SlotDescriptor* slot, ColumnValu case TYPE_DECIMALV2: case TYPE_CHAR: case TYPE_VARCHAR: - case TYPE_HLL: + case TYPE_HLL: case TYPE_DATETIME: case TYPE_SMALLINT: case TYPE_INT: diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 11333a30397631..c91eb34a8c32b1 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -84,13 +84,16 @@ class OlapScanNode : public ScanNode { class ExtendScanKeyVisitor : public boost::static_visitor { public: - ExtendScanKeyVisitor(OlapScanKeys& scan_keys) : _scan_keys(scan_keys) { } + ExtendScanKeyVisitor(OlapScanKeys& scan_keys, int32_t max_scan_key_num) + : _scan_keys(scan_keys), + _max_scan_key_num(max_scan_key_num) { } template Status operator()(T& v) { - return _scan_keys.extend_scan_key(v); + return _scan_keys.extend_scan_key(v, _max_scan_key_num); } private: OlapScanKeys& _scan_keys; + int32_t _max_scan_key_num; }; typedef boost::variant> string_list; @@ -143,10 +146,10 @@ class OlapScanNode : public ScanNode { Status normalize_predicate(ColumnValueRange& range, SlotDescriptor* slot); template - Status normalize_in_predicate(SlotDescriptor* slot, ColumnValueRange* range); + Status normalize_in_and_eq_predicate(SlotDescriptor* slot, ColumnValueRange* range); template - Status normalize_binary_predicate(SlotDescriptor* slot, ColumnValueRange* range); + Status normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange* range); void transfer_thread(RuntimeState* state); void scanner_thread(OlapScanner* scanner); @@ -243,6 +246,18 @@ class OlapScanNode : public ScanNode { bool _need_agg_finalize = true; + // the max num of scan keys of this scan request. + // it will set as BE's config `doris_max_scan_key_num`, + // or be overwritten by value in TQueryOptions + int32_t _max_scan_key_num = 1024; + // The max number of conditions in InPredicate that can be pushed down + // into OlapEngine. + // If conditions in InPredicate is larger than this, all conditions in + // InPredicate will not be pushed to the OlapEngine. + // it will set as BE's config `max_pushdown_conditions_per_column`, + // or be overwritten by value in TQueryOptions + int32_t _max_pushdown_conditions_per_column = 1024; + // Counters RuntimeProfile::Counter* _io_timer = nullptr; RuntimeProfile::Counter* _read_compressed_counter = nullptr; @@ -277,6 +292,8 @@ class OlapScanNode : public ScanNode { RuntimeProfile::Counter* _bitmap_index_filter_counter = nullptr; // time fro bitmap inverted index read and filter RuntimeProfile::Counter* _bitmap_index_filter_timer = nullptr; + // number of created olap scanners + RuntimeProfile::Counter* _num_scanners = nullptr; }; } // namespace doris diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 36d45ad56fef73..37333eb3c66ae2 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -129,6 +129,12 @@ Since this is a brpc configuration, users can also modify this parameter directl ### `doris_max_scan_key_num` +* Type: int +* Description: Used to limit the maximum number of scan keys that a scan node can split in a query request. When a conditional query request reaches the scan node, the scan node will try to split the conditions related to the key column in the query condition into multiple scan key ranges. After that, these scan key ranges will be assigned to multiple scanner threads for data scanning. A larger value usually means that more scanner threads can be used to increase the parallelism of the scanning operation. However, in high concurrency scenarios, too many threads may bring greater scheduling overhead and system load, and will slow down the query response speed. An empirical value is 50. This configuration can be configured separately at the session level. For details, please refer to the description of `max_scan_key_num` in [Variables](../variables.md). +* Default value: 1024 + +When the concurrency cannot be improved in high concurrency scenarios, try to reduce this value and observe the impact. + ### `doris_scan_range_row_count` ### `doris_scanner_queue_size` @@ -242,6 +248,20 @@ Indicates how many tablets in this data directory failed to load. At the same ti ### `max_percentage_of_error_disk` +### `max_pushdown_conditions_per_column` + +* Type: int +* Description: Used to limit the maximum number of conditions that can be pushed down to the storage engine for a single column in a query request. During the execution of the query plan, the filter conditions on some columns can be pushed down to the storage engine, so that the index information in the storage engine can be used for data filtering, reducing the amount of data that needs to be scanned by the query. Such as equivalent conditions, conditions in IN predicates, etc. In most cases, this parameter only affects queries containing IN predicates. Such as `WHERE colA IN (1,2,3,4, ...)`. A larger number means that more conditions in the IN predicate can be pushed to the storage engine, but too many conditions may cause an increase in random reads, and in some cases may reduce query efficiency. This configuration can be individually configured for session level. For details, please refer to the description of `max_pushdown_conditions_per_column` in [Variables](../ variables.md). +* Default value: 1024 + +* Example + + The table structure is `id INT, col2 INT, col3 varchar (32), ...`. + + The query is `... WHERE id IN (v1, v2, v3, ...)` + + If the number of conditions in the IN predicate exceeds the configuration, try to increase the configuration value and observe whether the query response has improved. + ### `max_runnings_transactions_per_txn_map` ### `max_tablet_num_per_shard` @@ -422,4 +442,4 @@ If the system is found to be in a high-stress scenario and a large number of thr ### `webserver_port` -### `write_buffer_size` \ No newline at end of file +### `write_buffer_size` diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 2333f453cbb501..b531008236d79a 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -235,6 +235,14 @@ SET forward_to_master = concat('tr', 'u', 'e'); Used for compatible JDBC connection pool C3P0. No practical effect. +* `max_pushdown_conditions_per_column` + + For the specific meaning of this variable, please refer to the description of `max_pushdown_conditions_per_column` in [BE Configuration](./config/be_config.md). This variable is set to -1 by default, which means that the configuration value in `be.conf` is used. If the setting is greater than 0, the query in the current session will use the variable value, and ignore the configuration value in `be.conf`. + +* `max_scan_key_num` + + For the specific meaning of this variable, please refer to the description of `doris_max_scan_key_num` in [BE Configuration](./config/be_config.md). This variable is set to -1 by default, which means that the configuration value in `be.conf` is used. If the setting is greater than 0, the query in the current session will use the variable value, and ignore the configuration value in `be.conf`. + * `net_buffer_length` Used for compatibility with MySQL clients. No practical effect. diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 63d36069e20281..7c9731ddf6e7a3 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -127,6 +127,12 @@ under the License. ### `doris_max_scan_key_num` +* 类型:int +* 描述:用于限制一个查询请求中,scan node 节点能拆分的最大 scan key 的个数。当一个带有条件的查询请求到达 scan node 节点时,scan node 会尝试将查询条件中 key 列相关的条件拆分成多个 scan key range。之后这些 scan key range 会被分配给多个 scanner 线程进行数据扫描。较大的数值通常意味着可以使用更多的 scanner 线程来提升扫描操作的并行度。但在高并发场景下,过多的线程可能会带来更大的调度开销和系统负载,反而会降低查询响应速度。一个经验数值为 50。该配置可以单独进行会话级别的配置,具体可参阅 [变量](../variables.md) 中 `max_scan_key_num` 的说明。 +* 默认值:1024 + +当在高并发场景下发下并发度无法提升时,可以尝试降低该数值并观察影响。 + ### `doris_scan_range_row_count` ### `doris_scanner_queue_size` @@ -223,6 +229,20 @@ under the License. ### `max_percentage_of_error_disk` +### `max_pushdown_conditions_per_column` + +* 类型:int +* 描述:用于限制一个查询请求中,针对单个列,能够下推到存储引擎的最大条件数量。在查询计划执行的过程中,一些列上的过滤条件可以下推到存储引擎,这样可以利用存储引擎中的索引信息进行数据过滤,减少查询需要扫描的数据量。比如等值条件、IN 谓词中的条件等。这个参数在绝大多数情况下仅影响包含 IN 谓词的查询。如 `WHERE colA IN (1,2,3,4,...)`。较大的数值意味值 IN 谓词中更多的条件可以推送给存储引擎,但过多的条件可能会导致随机读的增加,某些情况下可能会降低查询效率。该配置可以单独进行会话级别的配置,具体可参阅 [变量](../variables.md) 中 `max_pushdown_conditions_per_column ` 的说明。 +* 默认值:1024 + +* 示例 + + 表结构为 `id INT, col2 INT, col3 varchar(32), ...`。 + + 查询请求为 `... WHERE id IN (v1, v2, v3, ...)` + + 如果 IN 谓词中的条件数量超过了该配置,则可以尝试增加该配置值,观察查询响应是否有所改善。 + ### `max_runnings_transactions_per_txn_map` ### `max_tablet_num_per_shard` @@ -409,7 +429,7 @@ under the License. * 类型:布尔 * 描述:用来决定在有tablet 加载失败的情况下是否忽略错误,继续启动be -* 默认值: false +* 默认值:false BE启动时,会对每个数据目录单独启动一个线程进行 tablet header 元信息的加载。默认配置下,如果某个数据目录有 tablet 加载失败,则启动进程会终止。同时会在 `be.INFO` 日志中看到如下错误信息: diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index d728a38ed542c9..af0001176d4c07 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -234,6 +234,14 @@ SET forward_to_master = concat('tr', 'u', 'e'); 用于兼容 JDBC 连接池 C3P0。 无实际作用。 +* `max_pushdown_conditions_per_column` + + 该变量的具体含义请参阅 [BE 配置项](./config/be_config.md) 中 `max_pushdown_conditions_per_column` 的说明。该变量默认置为 -1,表示使用 `be.conf` 中的配置值。如果设置大于 0,则当前会话中的查询会使用该变量值,而忽略 `be.conf` 中的配置值。 + +* `max_scan_key_num` + + 该变量的具体含义请参阅 [BE 配置项](./config/be_config.md) 中 `doris_max_scan_key_num` 的说明。该变量默认置为 -1,表示使用 `be.conf` 中的配置值。如果设置大于 0,则当前会话中的查询会使用该变量值,而忽略 `be.conf` 中的配置值。 + * `net_buffer_length` 用于兼容 MySQL 客户端。无实际作用。 diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 83ef1876644dd9..de77ebb07f3932 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -46,6 +46,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; +import org.apache.doris.thrift.TQueryOptions; import com.google.common.base.Strings; @@ -383,12 +384,6 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { if (request.isSetResourceInfo()) { ctx.getSessionVariable().setResourceGroup(request.getResourceInfo().getGroup()); } - if (request.isSetExecMemLimit()) { - ctx.getSessionVariable().setMaxExecMemByte(request.getExecMemLimit()); - } - if (request.isSetQueryTimeout()) { - ctx.getSessionVariable().setQueryTimeoutS(request.getQueryTimeout()); - } if (request.isSetUser_ip()) { ctx.setRemoteIP(request.getUser_ip()); } @@ -401,9 +396,6 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { if (request.isSetSqlMode()) { ctx.getSessionVariable().setSqlMode(request.sqlMode); } - if (request.isSetLoadMemLimit()) { - ctx.getSessionVariable().setLoadMemLimit(request.loadMemLimit); - } if (request.isSetEnableStrictMode()) { ctx.getSessionVariable().setEnableInsertStrict(request.enableStrictMode); } @@ -411,6 +403,38 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { UserIdentity currentUserIdentity = UserIdentity.fromThrift(request.getCurrent_user_ident()); ctx.setCurrentUserIdentity(currentUserIdentity); } + + if (request.isSetQuery_options()) { + TQueryOptions queryOptions = request.getQuery_options(); + if (queryOptions.isSetMem_limit()) { + ctx.getSessionVariable().setMaxExecMemByte(queryOptions.getMem_limit()); + } + if (queryOptions.isSetQuery_timeout()) { + ctx.getSessionVariable().setQueryTimeoutS(queryOptions.getQuery_timeout()); + } + if (queryOptions.isSetLoad_mem_limit()) { + ctx.getSessionVariable().setLoadMemLimit(queryOptions.getLoad_mem_limit()); + } + if (queryOptions.isSetMax_scan_key_num()) { + ctx.getSessionVariable().setMaxScanKeyNum(queryOptions.getMax_scan_key_num()); + } + if (queryOptions.isSetMax_pushdown_conditions_per_column()) { + ctx.getSessionVariable().setMaxPushdownConditionsPerColumn( + queryOptions.getMax_pushdown_conditions_per_column()); + } + } else { + // for compatibility, all following variables are moved to TQueryOptions. + if (request.isSetExecMemLimit()) { + ctx.getSessionVariable().setMaxExecMemByte(request.getExecMemLimit()); + } + if (request.isSetQueryTimeout()) { + ctx.getSessionVariable().setQueryTimeoutS(request.getQueryTimeout()); + } + if (request.isSetLoadMemLimit()) { + ctx.getSessionVariable().setLoadMemLimit(request.loadMemLimit); + } + } + ctx.setThreadLocalInfo(); if (ctx.getCurrentUserIdentity() == null) { diff --git a/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index cf8ae2ada57494..4d0aa0c5e87fc4 100644 --- a/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -23,6 +23,7 @@ import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryOptions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -79,15 +80,18 @@ private void forward() throws Exception { params.setDb(ctx.getDatabase()); params.setSqlMode(ctx.getSessionVariable().getSqlMode()); params.setResourceInfo(ctx.toResourceCtx()); - params.setExecMemLimit(ctx.getSessionVariable().getMaxExecMemByte()); - params.setQueryTimeout(ctx.getSessionVariable().getQueryTimeoutS()); params.setUser_ip(ctx.getRemoteIP()); params.setTime_zone(ctx.getSessionVariable().getTimeZone()); params.setStmt_id(ctx.getStmtId()); - params.setLoadMemLimit(ctx.getSessionVariable().getLoadMemLimit()); params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict()); params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift()); + TQueryOptions queryOptions = new TQueryOptions(); + queryOptions.setMem_limit(ctx.getSessionVariable().getMaxExecMemByte()); + queryOptions.setQuery_timeout(ctx.getSessionVariable().getQueryTimeoutS()); + queryOptions.setLoad_mem_limit(ctx.getSessionVariable().getLoadMemLimit()); + params.setQuery_options(queryOptions); + LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress); boolean isReturnToPool = false; diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 1a0e4545f150aa..70292740f50a7c 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -96,6 +96,10 @@ public class SessionVariable implements Serializable, Writable { public static final String STORAGE_ENGINE = "storage_engine"; public static final String DIV_PRECISION_INCREMENT = "div_precision_increment"; + // see comment of `doris_max_scan_key_num` and `max_pushdown_conditions_per_column` in BE config + public static final String MAX_SCAN_KEY_NUM = "max_scan_key_num"; + public static final String MAX_PUSHDOWN_CONDITIONS_PER_COLUMN = "max_pushdown_conditions_per_column"; + // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) public long maxExecMemByte = 2147483648L; @@ -242,6 +246,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DIV_PRECISION_INCREMENT) private int divPrecisionIncrement = 4; + // -1 means unset, BE will use its config value + @VariableMgr.VarAttr(name = MAX_SCAN_KEY_NUM) + private int maxScanKeyNum = -1; + @VariableMgr.VarAttr(name = MAX_PUSHDOWN_CONDITIONS_PER_COLUMN) + private int maxPushdownConditionsPerColumn = -1; + public long getMaxExecMemByte() { return maxExecMemByte; } @@ -455,6 +465,22 @@ public String getDefaultRowsetType() { return defaultRowsetType; } + public int getMaxScanKeyNum() { + return maxScanKeyNum; + } + + public void setMaxScanKeyNum(int maxScanKeyNum) { + this.maxScanKeyNum = maxScanKeyNum; + } + + public int getMaxPushdownConditionsPerColumn() { + return maxPushdownConditionsPerColumn; + } + + public void setMaxPushdownConditionsPerColumn(int maxPushdownConditionsPerColumn) { + this.maxPushdownConditionsPerColumn = maxPushdownConditionsPerColumn; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { @@ -474,6 +500,13 @@ public TQueryOptions toThrift() { tResult.setBatch_size(batchSize); tResult.setDisable_stream_preaggregations(disableStreamPreaggregations); tResult.setLoad_mem_limit(loadMemLimit); + + if (maxScanKeyNum > -1) { + tResult.setMax_scan_key_num(maxScanKeyNum); + } + if (maxPushdownConditionsPerColumn > -1) { + tResult.setMax_pushdown_conditions_per_column(maxPushdownConditionsPerColumn); + } return tResult; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 9b1ea33af6a264..d15a099f2ece31 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -413,17 +413,18 @@ struct TMasterOpRequest { 3: required string sql 4: optional Types.TResourceInfo resourceInfo 5: optional string cluster - 6: optional i64 execMemLimit - 7: optional i32 queryTimeout + 6: optional i64 execMemLimit // deprecated, move into query_options + 7: optional i32 queryTimeout // deprecated, move into query_options 8: optional string user_ip 9: optional string time_zone 10: optional i64 stmt_id 11: optional i64 sqlMode - 12: optional i64 loadMemLimit + 12: optional i64 loadMemLimit // deprecated, move into query_options 13: optional bool enableStrictMode // this can replace the "user" field 14: optional Types.TUserIdentity current_user_ident 15: optional i32 stmtIdx // the idx of the sql in multi statements + 16: optional PaloInternalService.TQueryOptions query_options } struct TColumnDefinition { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b18467d987e068..8845c81c01dde7 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -130,7 +130,14 @@ struct TQueryOptions { // if this is a query option for LOAD, load_mem_limit should be set to limit the mem comsuption // of load channel. 28: optional i64 load_mem_limit = 0; + // see BE config `doris_max_scan_key_num` for details + // if set, this will overwrite the BE config. + 29: optional i32 max_scan_key_num; + // see BE config `max_pushdown_conditions_per_column` for details + // if set, this will overwrite the BE config. + 30: optional i32 max_pushdown_conditions_per_column } + // A scan range plus the parameters needed to execute that scan. struct TScanRangeParams {