Skip to content

Commit

Permalink
Improve fulltext performance (#2329)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Update file io for fulltext search
Disable log in phrase iterator in release build

Issue link:#1320

### Type of change

- [x] Refactoring
- [x] Performance Improvement
  • Loading branch information
yangzq50 authored Dec 6, 2024
1 parent 076f58f commit 115bf83
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 81 deletions.
15 changes: 9 additions & 6 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ bool PhysicalMatch::ExecuteInner(QueryContext *query_context, OperatorState *ope
append_data_block();
// 4.2 output
{
OutputToDataBlockHelper output_to_data_block_helper;
u32 output_block_idx = output_data_blocks.size() - 1;
Vector<SizeT> &column_ids = base_table_ref_->column_ids_;
SizeT column_n = column_ids.size();
u32 block_capacity = DEFAULT_BLOCK_CAPACITY;
Expand All @@ -283,29 +285,30 @@ bool PhysicalMatch::ExecuteInner(QueryContext *query_context, OperatorState *ope
output_block_ptr->Finalize();
append_data_block();
output_block_ptr = output_data_blocks.back().get();
++output_block_idx;
output_block_row_id = 0;
}
const RowID &row_id = row_id_result[output_id];
u32 segment_id = row_id.segment_id_;
u32 segment_offset = row_id.segment_offset_;
u16 block_id = segment_offset / DEFAULT_BLOCK_CAPACITY;
u16 block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY;
BlockEntry *block_entry = base_table_ref_->block_index_->GetBlockEntry(segment_id, block_id);
assert(block_entry != nullptr);
SizeT column_id = 0;

for (; column_id < column_n; ++column_id) {
ColumnVector column_vector = block_entry->GetConstColumnVector(query_context->storage()->buffer_manager(), column_ids[column_id]);
output_block_ptr->column_vectors[column_id]->AppendWith(column_vector, block_offset, 1);
output_to_data_block_helper
.AddOutputJobInfo(segment_id, block_id, column_ids[column_id], block_offset, output_block_idx, column_id, output_block_row_id);
output_block_ptr->column_vectors[column_id]->Finalize(output_block_ptr->column_vectors[column_id]->Size() + 1);
}
Value v = Value::MakeFloat(score_result[output_id]);
output_block_ptr->column_vectors[column_id++]->AppendValue(v);
output_block_ptr->column_vectors[column_id]->AppendWith(row_id, 1);
++output_block_row_id;
}
output_block_ptr->Finalize();
output_to_data_block_helper.OutputToDataBlock(query_context->storage()->buffer_manager(),
base_table_ref_->block_index_.get(),
output_data_blocks);
}

operator_state->SetComplete();
ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager();
if (cache_result_ && cache_mgr != nullptr) {
Expand Down
57 changes: 8 additions & 49 deletions src/executor/operator/physical_scan/physical_match_tensor_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,55 +431,14 @@ void PhysicalMatchTensorScan::ExecuteInner(QueryContext *query_context, MatchTen
} else {
// all task Complete
const u32 result_n = function_data.End();
const auto output_type_ptr = GetOutputTypes();
{
// prepare output data block
const u32 total_data_row_count = result_n;
u32 row_idx = 0;
do {
auto data_block = DataBlock::MakeUniquePtr();
data_block->Init(*output_type_ptr);
operator_state->data_block_array_.emplace_back(std::move(data_block));
row_idx += DEFAULT_BLOCK_CAPACITY;
} while (row_idx < total_data_row_count);
}
u32 output_block_row_id = 0;
u32 output_block_idx = 0;
DataBlock *output_block_ptr = operator_state->data_block_array_[output_block_idx].get();
const float *result_scores = function_data.score_result_.get();
const RowID *result_row_ids = function_data.row_id_result_.get();
for (u32 top_idx = 0; top_idx < result_n; ++top_idx) {
const SegmentID segment_id = result_row_ids[top_idx].segment_id_;
const SegmentOffset segment_offset = result_row_ids[top_idx].segment_offset_;
const BlockID block_id = segment_offset / DEFAULT_BLOCK_CAPACITY;
const BlockOffset block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY;
BlockEntry *block_entry = block_index->GetBlockEntry(segment_id, block_id);
if (block_entry == nullptr) {
String error_message = fmt::format("Cannot find segment id: {}, block id: {}", segment_id, block_id);
UnrecoverableError(error_message);
}
if (output_block_row_id == DEFAULT_BLOCK_CAPACITY) {
output_block_ptr->Finalize();
++output_block_idx;
output_block_ptr = operator_state->data_block_array_[output_block_idx].get();
output_block_row_id = 0;
}
const SizeT column_n = base_table_ref_->column_ids_.size();
for (SizeT i = 0; i < column_n; ++i) {
const auto column_id = base_table_ref_->column_ids_[i];
auto column_vector = block_entry->GetConstColumnVector(buffer_mgr, column_id);
output_block_ptr->column_vectors[i]->AppendWith(column_vector, block_offset, 1);
}
output_block_ptr->AppendValueByPtr(column_n, (ptr_t)&result_scores[top_idx]);
output_block_ptr->AppendValueByPtr(column_n + 1, (ptr_t)&result_row_ids[top_idx]);
++output_block_row_id;
}
output_block_ptr->Finalize();

ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager();
if (cache_result_ && cache_mgr != nullptr) {
AddCache(query_context, cache_mgr, operator_state->data_block_array_);
}
float *result_scores = function_data.score_result_.get();
RowID *result_row_ids = function_data.row_id_result_.get();
SetOutput(Vector<char *>{reinterpret_cast<char *>(result_scores)},
Vector<RowID *>{result_row_ids},
sizeof(std::remove_pointer_t<decltype(result_scores)>),
result_n,
query_context,
operator_state);
operator_state->SetComplete();
}
}
Expand Down
25 changes: 10 additions & 15 deletions src/executor/operator/physical_scan/physical_scan_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ void PhysicalScanBase::SetOutput(const Vector<char *> &raw_result_dists_list,
SizeT result_size,
i64 result_n,
QueryContext *query_context,
OperatorState *operator_state) {
OperatorState *operator_state) const {
BlockIndex *block_index = base_table_ref_->block_index_.get();
SizeT query_n = raw_result_dists_list.size();
if (query_n != 1u) {
UnrecoverableError(fmt::format("{}: Unexpected: more than 1 query?", __func__));
}

{
SizeT total_data_row_count = query_n * result_n;
Expand All @@ -104,28 +107,21 @@ void PhysicalScanBase::SetOutput(const Vector<char *> &raw_result_dists_list,
row_idx += DEFAULT_BLOCK_CAPACITY;
} while (row_idx < total_data_row_count);
}
auto *buffer_mgr = query_context->storage()->buffer_manager();

OutputToDataBlockHelper output_to_data_block_helper;
SizeT output_block_row_id = 0;
SizeT output_block_idx = 0;
DataBlock *output_block_ptr = operator_state->data_block_array_[output_block_idx].get();
for (SizeT query_idx = 0; query_idx < query_n; ++query_idx) {
char *raw_result_dists = raw_result_dists_list[query_idx];
RowID *row_ids = row_ids_list[query_idx];
for (i64 top_idx = 0; top_idx < result_n; ++top_idx) {
SizeT id = query_n * query_idx + top_idx;

SegmentID segment_id = row_ids[top_idx].segment_id_;
SegmentOffset segment_offset = row_ids[top_idx].segment_offset_;
BlockID block_id = segment_offset / DEFAULT_BLOCK_CAPACITY;
BlockOffset block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY;

BlockEntry *block_entry = block_index->GetBlockEntry(segment_id, block_id);
if (block_entry == nullptr) {
String error_message = fmt::format("Cannot find segment id: {}, block id: {}", segment_id, block_id);
UnrecoverableError(error_message);
}

if (output_block_row_id == DEFAULT_BLOCK_CAPACITY) {
output_block_ptr->Finalize();
++output_block_idx;
Expand All @@ -136,18 +132,17 @@ void PhysicalScanBase::SetOutput(const Vector<char *> &raw_result_dists_list,
SizeT column_n = base_table_ref_->column_ids_.size();
for (SizeT i = 0; i < column_n; ++i) {
SizeT column_id = base_table_ref_->column_ids_[i];
ColumnVector &&column_vector = block_entry->GetConstColumnVector(buffer_mgr, column_id);

output_block_ptr->column_vectors[i]->AppendWith(column_vector, block_offset, 1);
output_to_data_block_helper.AddOutputJobInfo(segment_id, block_id, column_id, block_offset, output_block_idx, i, output_block_row_id);
output_block_ptr->column_vectors[i]->Finalize(output_block_ptr->column_vectors[i]->Size() + 1);
}
output_block_ptr->AppendValueByPtr(column_n, raw_result_dists + id * result_size);
output_block_ptr->AppendValueByPtr(column_n + 1, (ptr_t)&row_ids[id]);
output_block_ptr->AppendValueByPtr(column_n, raw_result_dists + top_idx * result_size);
output_block_ptr->AppendValueByPtr(column_n + 1, (ptr_t)&row_ids[top_idx]);

++output_block_row_id;
}
}
output_block_ptr->Finalize();

output_to_data_block_helper.OutputToDataBlock(query_context->storage()->buffer_manager(), block_index, operator_state->data_block_array_);
ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager();
if (cache_result_ && cache_mgr != nullptr) {
AddCache(query_context, cache_mgr, operator_state->data_block_array_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected:
SizeT result_size,
i64 result_n,
QueryContext *query_context,
OperatorState *operator_state);
OperatorState *operator_state) const;

void AddCache(QueryContext *query_context, ResultCacheManager *cache_mgr, const Vector<UniquePtr<DataBlock>> &output_data_blocks) const;

Expand Down
44 changes: 35 additions & 9 deletions src/executor/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import data_block;
import txn;
import table_entry;
import cached_match;
import buffer_manager;
import block_index;

namespace infinity {

Expand All @@ -52,10 +54,10 @@ void PhysicalOperator::InputLoad(QueryContext *query_context, OperatorState *ope
// FIXME: After columnar reading is supported, use a different table_ref for each LoadMetas
auto table_ref = table_refs[load_metas[0].binding_.table_idx];
if (table_ref.get() == nullptr) {
String error_message = "TableRef not found";
UnrecoverableError(error_message);
UnrecoverableError("TableRef not found");
}

OutputToDataBlockHelper output_to_data_block_helper;
for (SizeT i = 0; i < operator_state->prev_op_state_->data_block_array_.size(); ++i) {
auto input_block = operator_state->prev_op_state_->data_block_array_[i].get();
SizeT load_column_count = load_metas_->size();
Expand All @@ -69,7 +71,7 @@ void PhysicalOperator::InputLoad(QueryContext *query_context, OperatorState *ope
auto column_vector_type =
(load_metas[j].type_->type() == LogicalType::kBoolean) ? ColumnVectorType::kCompactBit : ColumnVectorType::kFlat;
column_vector->Initialize(column_vector_type, capacity);

column_vector->Finalize(row_count);
input_block->InsertVector(column_vector, load_metas[j].index_);
}

Expand All @@ -82,16 +84,15 @@ void PhysicalOperator::InputLoad(QueryContext *query_context, OperatorState *ope
u32 segment_offset = row_id.segment_offset_;
u16 block_id = segment_offset / DEFAULT_BLOCK_CAPACITY;
u16 block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY;

BlockEntry *block_entry = table_ref->block_index_->GetBlockEntry(segment_id, block_id);
for (SizeT k = 0; k < load_column_count; ++k) {
auto binding = load_metas[k].binding_;

ColumnVector column_vector = block_entry->GetConstColumnVector(query_context->storage()->buffer_manager(), binding.column_idx);
input_block->column_vectors[load_metas[k].index_]->AppendWith(column_vector, block_offset, 1);
output_to_data_block_helper
.AddOutputJobInfo(segment_id, block_id, load_metas[k].binding_.column_idx, block_offset, i, load_metas[k].index_, j);
}
}
}
output_to_data_block_helper.OutputToDataBlock(query_context->storage()->buffer_manager(),
table_ref->block_index_.get(),
operator_state->prev_op_state_->data_block_array_);
}

SharedPtr<Vector<String>> PhysicalCommonFunctionUsingLoadMeta::GetOutputNames(const PhysicalOperator &op) {
Expand All @@ -116,4 +117,29 @@ SharedPtr<Vector<SharedPtr<DataType>>> PhysicalCommonFunctionUsingLoadMeta::GetO
return output_types;
}

void OutputToDataBlockHelper::OutputToDataBlock(BufferManager *buffer_mgr,
const BlockIndex *block_index,
const Vector<UniquePtr<DataBlock>> &output_data_blocks) {
std::sort(output_job_infos.begin(), output_job_infos.end());
auto cache_segment_id = std::numeric_limits<SegmentID>::max();
auto cache_block_id = std::numeric_limits<BlockID>::max();
BlockEntry *cache_block_entry = nullptr;
auto cache_column_id = std::numeric_limits<ColumnID>::max();
ColumnVector cache_column_vector;
for (const auto [segment_id, block_id, column_id, block_offset, output_block_id, output_column_id, output_row_id] : output_job_infos) {
if (segment_id != cache_segment_id || block_id != cache_block_id) {
cache_segment_id = segment_id;
cache_block_id = block_id;
cache_block_entry = block_index->GetBlockEntry(segment_id, block_id);
cache_column_id = std::numeric_limits<ColumnID>::max();
}
if (column_id != cache_column_id) {
cache_column_vector = cache_block_entry->GetConstColumnVector(buffer_mgr, column_id);
}
auto val_for_update = cache_column_vector.GetValue(block_offset);
output_data_blocks[output_block_id]->column_vectors[output_column_id]->SetValue(output_row_id, val_for_update);
}
output_job_infos.clear();
}

} // namespace infinity
30 changes: 30 additions & 0 deletions src/executor/physical_operator.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,34 @@ export struct PhysicalCommonFunctionUsingLoadMeta {
static SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes(const PhysicalOperator &op);
};

struct OutputJobInfo {
// src data info
SegmentID segment_id_{};
BlockID block_id_{};
ColumnID column_id_{};
BlockOffset block_offset_{};
// target position
u32 output_block_id_{};
u32 output_column_id_{};
u32 output_row_id_{};
friend auto operator<=>(const OutputJobInfo &, const OutputJobInfo &) = default;
};

class BufferManager;
struct BlockIndex;
struct DataBlock;
export struct OutputToDataBlockHelper {
Vector<OutputJobInfo> output_job_infos;
void AddOutputJobInfo(const SegmentID segment_id,
const BlockID block_id,
const ColumnID column_id,
const BlockOffset block_offset,
const u32 output_block_id,
const u32 output_column_id,
const u32 output_row_id) {
output_job_infos.emplace_back(segment_id, block_id, column_id, block_offset, output_block_id, output_column_id, output_row_id);
}
void OutputToDataBlock(BufferManager *buffer_mgr, const BlockIndex *block_index, const Vector<UniquePtr<DataBlock>> &output_data_blocks);
};

} // namespace infinity
4 changes: 4 additions & 0 deletions src/storage/invertedindex/search/phrase_doc_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ bool PhraseDocIterator::GetExactPhraseMatchData() {
begin_positions.push_back(now_position0);
}
}
#ifdef INFINITY_DEBUG
if (SHOULD_LOG_DEBUG()) {
std::ostringstream oss;
oss << "Phrase \"" << terms_ptr_->at(0);
Expand All @@ -152,6 +153,7 @@ bool PhraseDocIterator::GetExactPhraseMatchData() {
}
LOG_DEBUG(oss.str());
}
#endif
if (begin_positions.empty()) {
return false;
}
Expand Down Expand Up @@ -265,6 +267,7 @@ bool PhraseDocIterator::GetSloppyPhraseMatchData() {
for (auto &solution : solutions) {
tf_ += 1.0F / (1.0F + solution.matchLength);
}
#ifdef INFINITY_DEBUG
if (SHOULD_LOG_DEBUG()) {
std::ostringstream oss;
oss << "Phrase \"" << terms_ptr_->at(0);
Expand All @@ -283,6 +286,7 @@ bool PhraseDocIterator::GetSloppyPhraseMatchData() {
}
LOG_DEBUG(oss.str());
}
#endif
if (!solutions.empty()) {
doc_freq_++;
all_tf_.push_back(tf_);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/search/query_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,8 @@ std::unique_ptr<DocIterator> OrQueryNode::CreateSearch(const CreateSearchParams
UnrecoverableError("Unreachable code");
return nullptr;
}
if (params.early_term_algo == EarlyTermAlgo::kAuto && params.ft_similarity == FulltextSimilarity::kBM25) {
if ((params.early_term_algo == EarlyTermAlgo::kAuto || params.early_term_algo == EarlyTermAlgo::kBatch) &&
params.ft_similarity == FulltextSimilarity::kBM25) {
// try to apply batch when possible
// collect all term children info
u64 total_df = 0u;
Expand Down

0 comments on commit 115bf83

Please sign in to comment.