From 115bf838886258ba3dd1a05edc81543306a7382f Mon Sep 17 00:00:00 2001 From: yangzq50 Date: Fri, 6 Dec 2024 15:09:37 +0800 Subject: [PATCH] Improve fulltext performance (#2329) ### What problem does this PR solve? Update file io for fulltext search Disable log in phrase iterator in release build Issue link:#1320 ### Type of change - [x] Refactoring - [x] Performance Improvement --- src/executor/operator/physical_match.cpp | 15 +++-- .../physical_match_tensor_scan.cpp | 57 +++---------------- .../physical_scan/physical_scan_base.cpp | 25 ++++---- .../physical_scan/physical_scan_base.cppm | 2 +- src/executor/physical_operator.cpp | 44 +++++++++++--- src/executor/physical_operator.cppm | 30 ++++++++++ .../search/phrase_doc_iterator.cpp | 4 ++ .../invertedindex/search/query_node.cpp | 3 +- 8 files changed, 99 insertions(+), 81 deletions(-) diff --git a/src/executor/operator/physical_match.cpp b/src/executor/operator/physical_match.cpp index e1ef946311..6173cd1c5e 100644 --- a/src/executor/operator/physical_match.cpp +++ b/src/executor/operator/physical_match.cpp @@ -272,6 +272,8 @@ bool PhysicalMatch::ExecuteInner(QueryContext *query_context, OperatorState *ope append_data_block(); // 4.2 output { + OutputToDataBlockHelper output_to_data_block_helper; + u32 output_block_idx = output_data_blocks.size() - 1; Vector &column_ids = base_table_ref_->column_ids_; SizeT column_n = column_ids.size(); u32 block_capacity = DEFAULT_BLOCK_CAPACITY; @@ -283,6 +285,7 @@ bool PhysicalMatch::ExecuteInner(QueryContext *query_context, OperatorState *ope output_block_ptr->Finalize(); append_data_block(); output_block_ptr = output_data_blocks.back().get(); + ++output_block_idx; output_block_row_id = 0; } const RowID &row_id = row_id_result[output_id]; @@ -290,13 +293,11 @@ bool PhysicalMatch::ExecuteInner(QueryContext *query_context, OperatorState *ope u32 segment_offset = row_id.segment_offset_; u16 block_id = segment_offset / DEFAULT_BLOCK_CAPACITY; u16 block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY; - BlockEntry *block_entry = base_table_ref_->block_index_->GetBlockEntry(segment_id, block_id); - assert(block_entry != nullptr); SizeT column_id = 0; - for (; column_id < column_n; ++column_id) { - ColumnVector column_vector = block_entry->GetConstColumnVector(query_context->storage()->buffer_manager(), column_ids[column_id]); - output_block_ptr->column_vectors[column_id]->AppendWith(column_vector, block_offset, 1); + output_to_data_block_helper + .AddOutputJobInfo(segment_id, block_id, column_ids[column_id], block_offset, output_block_idx, column_id, output_block_row_id); + output_block_ptr->column_vectors[column_id]->Finalize(output_block_ptr->column_vectors[column_id]->Size() + 1); } Value v = Value::MakeFloat(score_result[output_id]); output_block_ptr->column_vectors[column_id++]->AppendValue(v); @@ -304,8 +305,10 @@ bool PhysicalMatch::ExecuteInner(QueryContext *query_context, OperatorState *ope ++output_block_row_id; } output_block_ptr->Finalize(); + output_to_data_block_helper.OutputToDataBlock(query_context->storage()->buffer_manager(), + base_table_ref_->block_index_.get(), + output_data_blocks); } - operator_state->SetComplete(); ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager(); if (cache_result_ && cache_mgr != nullptr) { diff --git a/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp b/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp index 806ff9acec..8fbede0534 100644 --- a/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp +++ b/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp @@ -431,55 +431,14 @@ void PhysicalMatchTensorScan::ExecuteInner(QueryContext *query_context, MatchTen } else { // all task Complete const u32 result_n = function_data.End(); - const auto output_type_ptr = GetOutputTypes(); - { - // prepare output data block - const u32 total_data_row_count = result_n; - u32 row_idx = 0; - do { - auto data_block = DataBlock::MakeUniquePtr(); - data_block->Init(*output_type_ptr); - operator_state->data_block_array_.emplace_back(std::move(data_block)); - row_idx += DEFAULT_BLOCK_CAPACITY; - } while (row_idx < total_data_row_count); - } - u32 output_block_row_id = 0; - u32 output_block_idx = 0; - DataBlock *output_block_ptr = operator_state->data_block_array_[output_block_idx].get(); - const float *result_scores = function_data.score_result_.get(); - const RowID *result_row_ids = function_data.row_id_result_.get(); - for (u32 top_idx = 0; top_idx < result_n; ++top_idx) { - const SegmentID segment_id = result_row_ids[top_idx].segment_id_; - const SegmentOffset segment_offset = result_row_ids[top_idx].segment_offset_; - const BlockID block_id = segment_offset / DEFAULT_BLOCK_CAPACITY; - const BlockOffset block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY; - BlockEntry *block_entry = block_index->GetBlockEntry(segment_id, block_id); - if (block_entry == nullptr) { - String error_message = fmt::format("Cannot find segment id: {}, block id: {}", segment_id, block_id); - UnrecoverableError(error_message); - } - if (output_block_row_id == DEFAULT_BLOCK_CAPACITY) { - output_block_ptr->Finalize(); - ++output_block_idx; - output_block_ptr = operator_state->data_block_array_[output_block_idx].get(); - output_block_row_id = 0; - } - const SizeT column_n = base_table_ref_->column_ids_.size(); - for (SizeT i = 0; i < column_n; ++i) { - const auto column_id = base_table_ref_->column_ids_[i]; - auto column_vector = block_entry->GetConstColumnVector(buffer_mgr, column_id); - output_block_ptr->column_vectors[i]->AppendWith(column_vector, block_offset, 1); - } - output_block_ptr->AppendValueByPtr(column_n, (ptr_t)&result_scores[top_idx]); - output_block_ptr->AppendValueByPtr(column_n + 1, (ptr_t)&result_row_ids[top_idx]); - ++output_block_row_id; - } - output_block_ptr->Finalize(); - - ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager(); - if (cache_result_ && cache_mgr != nullptr) { - AddCache(query_context, cache_mgr, operator_state->data_block_array_); - } + float *result_scores = function_data.score_result_.get(); + RowID *result_row_ids = function_data.row_id_result_.get(); + SetOutput(Vector{reinterpret_cast(result_scores)}, + Vector{result_row_ids}, + sizeof(std::remove_pointer_t), + result_n, + query_context, + operator_state); operator_state->SetComplete(); } } diff --git a/src/executor/operator/physical_scan/physical_scan_base.cpp b/src/executor/operator/physical_scan/physical_scan_base.cpp index 0171589b40..ea772504df 100644 --- a/src/executor/operator/physical_scan/physical_scan_base.cpp +++ b/src/executor/operator/physical_scan/physical_scan_base.cpp @@ -90,9 +90,12 @@ void PhysicalScanBase::SetOutput(const Vector &raw_result_dists_list, SizeT result_size, i64 result_n, QueryContext *query_context, - OperatorState *operator_state) { + OperatorState *operator_state) const { BlockIndex *block_index = base_table_ref_->block_index_.get(); SizeT query_n = raw_result_dists_list.size(); + if (query_n != 1u) { + UnrecoverableError(fmt::format("{}: Unexpected: more than 1 query?", __func__)); + } { SizeT total_data_row_count = query_n * result_n; @@ -104,8 +107,8 @@ void PhysicalScanBase::SetOutput(const Vector &raw_result_dists_list, row_idx += DEFAULT_BLOCK_CAPACITY; } while (row_idx < total_data_row_count); } - auto *buffer_mgr = query_context->storage()->buffer_manager(); + OutputToDataBlockHelper output_to_data_block_helper; SizeT output_block_row_id = 0; SizeT output_block_idx = 0; DataBlock *output_block_ptr = operator_state->data_block_array_[output_block_idx].get(); @@ -113,19 +116,12 @@ void PhysicalScanBase::SetOutput(const Vector &raw_result_dists_list, char *raw_result_dists = raw_result_dists_list[query_idx]; RowID *row_ids = row_ids_list[query_idx]; for (i64 top_idx = 0; top_idx < result_n; ++top_idx) { - SizeT id = query_n * query_idx + top_idx; SegmentID segment_id = row_ids[top_idx].segment_id_; SegmentOffset segment_offset = row_ids[top_idx].segment_offset_; BlockID block_id = segment_offset / DEFAULT_BLOCK_CAPACITY; BlockOffset block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY; - BlockEntry *block_entry = block_index->GetBlockEntry(segment_id, block_id); - if (block_entry == nullptr) { - String error_message = fmt::format("Cannot find segment id: {}, block id: {}", segment_id, block_id); - UnrecoverableError(error_message); - } - if (output_block_row_id == DEFAULT_BLOCK_CAPACITY) { output_block_ptr->Finalize(); ++output_block_idx; @@ -136,18 +132,17 @@ void PhysicalScanBase::SetOutput(const Vector &raw_result_dists_list, SizeT column_n = base_table_ref_->column_ids_.size(); for (SizeT i = 0; i < column_n; ++i) { SizeT column_id = base_table_ref_->column_ids_[i]; - ColumnVector &&column_vector = block_entry->GetConstColumnVector(buffer_mgr, column_id); - - output_block_ptr->column_vectors[i]->AppendWith(column_vector, block_offset, 1); + output_to_data_block_helper.AddOutputJobInfo(segment_id, block_id, column_id, block_offset, output_block_idx, i, output_block_row_id); + output_block_ptr->column_vectors[i]->Finalize(output_block_ptr->column_vectors[i]->Size() + 1); } - output_block_ptr->AppendValueByPtr(column_n, raw_result_dists + id * result_size); - output_block_ptr->AppendValueByPtr(column_n + 1, (ptr_t)&row_ids[id]); + output_block_ptr->AppendValueByPtr(column_n, raw_result_dists + top_idx * result_size); + output_block_ptr->AppendValueByPtr(column_n + 1, (ptr_t)&row_ids[top_idx]); ++output_block_row_id; } } output_block_ptr->Finalize(); - + output_to_data_block_helper.OutputToDataBlock(query_context->storage()->buffer_manager(), block_index, operator_state->data_block_array_); ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager(); if (cache_result_ && cache_mgr != nullptr) { AddCache(query_context, cache_mgr, operator_state->data_block_array_); diff --git a/src/executor/operator/physical_scan/physical_scan_base.cppm b/src/executor/operator/physical_scan/physical_scan_base.cppm index 32a900497a..f753174f1d 100644 --- a/src/executor/operator/physical_scan/physical_scan_base.cppm +++ b/src/executor/operator/physical_scan/physical_scan_base.cppm @@ -68,7 +68,7 @@ protected: SizeT result_size, i64 result_n, QueryContext *query_context, - OperatorState *operator_state); + OperatorState *operator_state) const; void AddCache(QueryContext *query_context, ResultCacheManager *cache_mgr, const Vector> &output_data_blocks) const; diff --git a/src/executor/physical_operator.cpp b/src/executor/physical_operator.cpp index ede67b950c..5da8190a57 100644 --- a/src/executor/physical_operator.cpp +++ b/src/executor/physical_operator.cpp @@ -37,6 +37,8 @@ import data_block; import txn; import table_entry; import cached_match; +import buffer_manager; +import block_index; namespace infinity { @@ -52,10 +54,10 @@ void PhysicalOperator::InputLoad(QueryContext *query_context, OperatorState *ope // FIXME: After columnar reading is supported, use a different table_ref for each LoadMetas auto table_ref = table_refs[load_metas[0].binding_.table_idx]; if (table_ref.get() == nullptr) { - String error_message = "TableRef not found"; - UnrecoverableError(error_message); + UnrecoverableError("TableRef not found"); } + OutputToDataBlockHelper output_to_data_block_helper; for (SizeT i = 0; i < operator_state->prev_op_state_->data_block_array_.size(); ++i) { auto input_block = operator_state->prev_op_state_->data_block_array_[i].get(); SizeT load_column_count = load_metas_->size(); @@ -69,7 +71,7 @@ void PhysicalOperator::InputLoad(QueryContext *query_context, OperatorState *ope auto column_vector_type = (load_metas[j].type_->type() == LogicalType::kBoolean) ? ColumnVectorType::kCompactBit : ColumnVectorType::kFlat; column_vector->Initialize(column_vector_type, capacity); - + column_vector->Finalize(row_count); input_block->InsertVector(column_vector, load_metas[j].index_); } @@ -82,16 +84,15 @@ void PhysicalOperator::InputLoad(QueryContext *query_context, OperatorState *ope u32 segment_offset = row_id.segment_offset_; u16 block_id = segment_offset / DEFAULT_BLOCK_CAPACITY; u16 block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY; - - BlockEntry *block_entry = table_ref->block_index_->GetBlockEntry(segment_id, block_id); for (SizeT k = 0; k < load_column_count; ++k) { - auto binding = load_metas[k].binding_; - - ColumnVector column_vector = block_entry->GetConstColumnVector(query_context->storage()->buffer_manager(), binding.column_idx); - input_block->column_vectors[load_metas[k].index_]->AppendWith(column_vector, block_offset, 1); + output_to_data_block_helper + .AddOutputJobInfo(segment_id, block_id, load_metas[k].binding_.column_idx, block_offset, i, load_metas[k].index_, j); } } } + output_to_data_block_helper.OutputToDataBlock(query_context->storage()->buffer_manager(), + table_ref->block_index_.get(), + operator_state->prev_op_state_->data_block_array_); } SharedPtr> PhysicalCommonFunctionUsingLoadMeta::GetOutputNames(const PhysicalOperator &op) { @@ -116,4 +117,29 @@ SharedPtr>> PhysicalCommonFunctionUsingLoadMeta::GetO return output_types; } +void OutputToDataBlockHelper::OutputToDataBlock(BufferManager *buffer_mgr, + const BlockIndex *block_index, + const Vector> &output_data_blocks) { + std::sort(output_job_infos.begin(), output_job_infos.end()); + auto cache_segment_id = std::numeric_limits::max(); + auto cache_block_id = std::numeric_limits::max(); + BlockEntry *cache_block_entry = nullptr; + auto cache_column_id = std::numeric_limits::max(); + ColumnVector cache_column_vector; + for (const auto [segment_id, block_id, column_id, block_offset, output_block_id, output_column_id, output_row_id] : output_job_infos) { + if (segment_id != cache_segment_id || block_id != cache_block_id) { + cache_segment_id = segment_id; + cache_block_id = block_id; + cache_block_entry = block_index->GetBlockEntry(segment_id, block_id); + cache_column_id = std::numeric_limits::max(); + } + if (column_id != cache_column_id) { + cache_column_vector = cache_block_entry->GetConstColumnVector(buffer_mgr, column_id); + } + auto val_for_update = cache_column_vector.GetValue(block_offset); + output_data_blocks[output_block_id]->column_vectors[output_column_id]->SetValue(output_row_id, val_for_update); + } + output_job_infos.clear(); +} + } // namespace infinity diff --git a/src/executor/physical_operator.cppm b/src/executor/physical_operator.cppm index 21a304a30a..54bbec0c4f 100644 --- a/src/executor/physical_operator.cppm +++ b/src/executor/physical_operator.cppm @@ -124,4 +124,34 @@ export struct PhysicalCommonFunctionUsingLoadMeta { static SharedPtr>> GetOutputTypes(const PhysicalOperator &op); }; +struct OutputJobInfo { + // src data info + SegmentID segment_id_{}; + BlockID block_id_{}; + ColumnID column_id_{}; + BlockOffset block_offset_{}; + // target position + u32 output_block_id_{}; + u32 output_column_id_{}; + u32 output_row_id_{}; + friend auto operator<=>(const OutputJobInfo &, const OutputJobInfo &) = default; +}; + +class BufferManager; +struct BlockIndex; +struct DataBlock; +export struct OutputToDataBlockHelper { + Vector output_job_infos; + void AddOutputJobInfo(const SegmentID segment_id, + const BlockID block_id, + const ColumnID column_id, + const BlockOffset block_offset, + const u32 output_block_id, + const u32 output_column_id, + const u32 output_row_id) { + output_job_infos.emplace_back(segment_id, block_id, column_id, block_offset, output_block_id, output_column_id, output_row_id); + } + void OutputToDataBlock(BufferManager *buffer_mgr, const BlockIndex *block_index, const Vector> &output_data_blocks); +}; + } // namespace infinity diff --git a/src/storage/invertedindex/search/phrase_doc_iterator.cpp b/src/storage/invertedindex/search/phrase_doc_iterator.cpp index 8a0b2208d9..d1eed06d26 100644 --- a/src/storage/invertedindex/search/phrase_doc_iterator.cpp +++ b/src/storage/invertedindex/search/phrase_doc_iterator.cpp @@ -140,6 +140,7 @@ bool PhraseDocIterator::GetExactPhraseMatchData() { begin_positions.push_back(now_position0); } } +#ifdef INFINITY_DEBUG if (SHOULD_LOG_DEBUG()) { std::ostringstream oss; oss << "Phrase \"" << terms_ptr_->at(0); @@ -152,6 +153,7 @@ bool PhraseDocIterator::GetExactPhraseMatchData() { } LOG_DEBUG(oss.str()); } +#endif if (begin_positions.empty()) { return false; } @@ -265,6 +267,7 @@ bool PhraseDocIterator::GetSloppyPhraseMatchData() { for (auto &solution : solutions) { tf_ += 1.0F / (1.0F + solution.matchLength); } +#ifdef INFINITY_DEBUG if (SHOULD_LOG_DEBUG()) { std::ostringstream oss; oss << "Phrase \"" << terms_ptr_->at(0); @@ -283,6 +286,7 @@ bool PhraseDocIterator::GetSloppyPhraseMatchData() { } LOG_DEBUG(oss.str()); } +#endif if (!solutions.empty()) { doc_freq_++; all_tf_.push_back(tf_); diff --git a/src/storage/invertedindex/search/query_node.cpp b/src/storage/invertedindex/search/query_node.cpp index 27741b0e2b..c7f39fc4d7 100644 --- a/src/storage/invertedindex/search/query_node.cpp +++ b/src/storage/invertedindex/search/query_node.cpp @@ -628,7 +628,8 @@ std::unique_ptr OrQueryNode::CreateSearch(const CreateSearchParams UnrecoverableError("Unreachable code"); return nullptr; } - if (params.early_term_algo == EarlyTermAlgo::kAuto && params.ft_similarity == FulltextSimilarity::kBM25) { + if ((params.early_term_algo == EarlyTermAlgo::kAuto || params.early_term_algo == EarlyTermAlgo::kBatch) && + params.ft_similarity == FulltextSimilarity::kBM25) { // try to apply batch when possible // collect all term children info u64 total_df = 0u;