Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix330
Browse files Browse the repository at this point in the history
  • Loading branch information
JinHai-CN committed Oct 16, 2024
2 parents 35fe3c8 + 4b21d50 commit e4387ac
Show file tree
Hide file tree
Showing 60 changed files with 1,563 additions and 177 deletions.
4 changes: 3 additions & 1 deletion src/common/stl.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,12 @@ export namespace std {
using std::isnan;
using std::log2;
using std::make_heap;
using std::push_heap;
using std::pop_heap;
using std::max_element;
using std::min_element;
using std::nearbyint;
using std::partial_sort;
using std::pop_heap;
using std::pow;
using std::reduce;
using std::remove_if;
Expand Down Expand Up @@ -291,6 +292,7 @@ export namespace std {
using std::conditional_t;
using std::remove_pointer_t;
using std::remove_reference_t;
using std::derived_from;

using std::function;
using std::monostate;
Expand Down
38 changes: 38 additions & 0 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import physical_merge_match_sparse;
import physical_merge_aggregate;
import status;
import physical_operator_type;
import physical_read_cache;

import explain_logical_plan;
import logical_show;
Expand All @@ -93,6 +94,7 @@ import common_query_filter;
import table_entry;
import logger;
import show_statement;
import base_table_ref;

namespace infinity {

Expand Down Expand Up @@ -305,6 +307,10 @@ void ExplainPhysicalPlan::Explain(const PhysicalOperator *op, SharedPtr<Vector<S
Explain((PhysicalMergeAggregate *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kReadCache: {
Explain(static_cast<const PhysicalReadCache *>(op), result, intent_size);
break;
}
default: {
String error_message = "Unexpected physical operator type";
UnrecoverableError(error_message);
Expand Down Expand Up @@ -2648,4 +2654,36 @@ void ExplainPhysicalPlan::Explain(const PhysicalMergeAggregate *merge_aggregate_
result->emplace_back(MakeShared<String>(output_columns));
}

void ExplainPhysicalPlan::Explain(const PhysicalReadCache *read_cache_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
String explain_header_str;
if (intent_size != 0) {
explain_header_str = String(intent_size - 2, ' ') + "-> Read cache ";
} else {
explain_header_str = "Read cache ";
}
explain_header_str += "(" + std::to_string(read_cache_node->node_id()) + ")";
result->emplace_back(MakeShared<String>(explain_header_str));

const BaseTableRef *base_table_ref = read_cache_node->base_table_ref();
// Table alias and name
String table_name = String(intent_size, ' ') + " - table name: (";
table_name += *base_table_ref->schema_name() + ".";
table_name += *base_table_ref->table_name() + ")";
result->emplace_back(MakeShared<String>(table_name));

// Output columns
String output_columns = String(intent_size, ' ') + " - output columns: [";
SizeT column_count = read_cache_node->GetOutputNames()->size();
if (column_count == 0) {
String error_message = "No column in read cache node.";
UnrecoverableError(error_message);
}
for (SizeT idx = 0; idx < column_count - 1; ++idx) {
output_columns += read_cache_node->GetOutputNames()->at(idx) + ", ";
}
output_columns += read_cache_node->GetOutputNames()->back();
output_columns += "]";
result->emplace_back(MakeShared<String>(output_columns));
}

} // namespace infinity
3 changes: 3 additions & 0 deletions src/executor/explain_physical_plan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import physical_match_tensor_scan;
import physical_fusion;
import physical_merge_aggregate;
import physical_match_sparse_scan;
import physical_read_cache;

export module explain_physical_plan;

Expand Down Expand Up @@ -178,6 +179,8 @@ public:
static void Explain(const PhysicalFusion *fusion_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalMergeAggregate *fusion_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalReadCache *read_cache_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);
};

} // namespace infinity
3 changes: 2 additions & 1 deletion src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
case PhysicalOperatorType::kInsert:
case PhysicalOperatorType::kImport:
case PhysicalOperatorType::kExport:
case PhysicalOperatorType::kMatch: {
case PhysicalOperatorType::kMatch:
case PhysicalOperatorType::kReadCache: {
current_fragment_ptr->AddOperator(phys_op);
if (phys_op->left() != nullptr or phys_op->right() != nullptr) {
String error_message = fmt::format("{} shouldn't have child.", phys_op->GetName());
Expand Down
5 changes: 4 additions & 1 deletion src/executor/operator/physical_drop_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import txn;
import query_context;
import table_def;
import data_table;

import result_cache_manager;
import physical_operator_type;
import operator_state;
import status;
Expand All @@ -37,6 +37,9 @@ bool PhysicalDropTable::Execute(QueryContext *query_context, OperatorState *oper
auto txn = query_context->GetTxn();

Status status = txn->DropTableCollectionByName(*schema_name_, *table_name_, conflict_type_);
if (ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager(); cache_mgr != nullptr) {
cache_mgr->DropTable(*schema_name_, *table_name_);
}

if (!status.ok()) {
operator_state->status_ = status;
Expand Down
90 changes: 46 additions & 44 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ import segment_entry;
import knn_filter;
import highlighter;
import parse_fulltext_options;
import result_cache_manager;
import cached_match;

namespace infinity {

Expand Down Expand Up @@ -103,7 +105,6 @@ class FilterIterator final : public DocIterator {
void UpdateScoreThreshold(float threshold) override { query_iterator_->UpdateScoreThreshold(threshold); }

// for minimum_should_match parameter
u32 LeafCount() const override { return query_iterator_->LeafCount(); }
u32 MatchCount() const override { return query_iterator_->MatchCount(); }

void PrintTree(std::ostream &os, const String &prefix, bool is_final) const override {
Expand Down Expand Up @@ -144,19 +145,24 @@ struct FilterQueryNode final : public QueryNode {
query_tree_ = std::move(new_query_tree);
}

uint32_t LeafCount() const override { return query_tree_->LeafCount(); }

void PushDownWeight(float factor) override { MultiplyWeight(factor); }

std::unique_ptr<DocIterator>
CreateSearch(const TableEntry *table_entry, const IndexReader &index_reader, EarlyTermAlgo early_term_algo) const override {
std::unique_ptr<DocIterator> CreateSearch(const TableEntry *table_entry,
const IndexReader &index_reader,
const EarlyTermAlgo early_term_algo,
const u32 minimum_should_match) const override {
assert(common_query_filter_ != nullptr);
if (!common_query_filter_->AlwaysTrue() && common_query_filter_->filter_result_count_ == 0)
return nullptr;
auto search_iter = query_tree_->CreateSearch(table_entry, index_reader, early_term_algo);
auto search_iter = query_tree_->CreateSearch(table_entry, index_reader, early_term_algo, minimum_should_match);
if (!search_iter) {
return nullptr;
}
if (common_query_filter_->AlwaysTrue())
if (common_query_filter_->AlwaysTrue()) {
return search_iter;
}
return MakeUnique<FilterIterator>(common_query_filter_, std::move(search_iter));
}

Expand Down Expand Up @@ -186,20 +192,18 @@ void ASSERT_FLOAT_EQ(float bar, u32 i, float a, float b) {
}
}

template <bool use_minimum_should_match>
void ExecuteFTSearchT(UniquePtr<DocIterator> &et_iter, FullTextScoreResultHeap &result_heap, u32 &blockmax_loop_cnt, const u32 minimum_should_match) {
void ExecuteFTSearch(UniquePtr<DocIterator> &et_iter, FullTextScoreResultHeap &result_heap, u32 &blockmax_loop_cnt) {
// et_iter is nullptr if fulltext index is present but there's no data
if (et_iter == nullptr) {
LOG_DEBUG(fmt::format("et_iter is nullptr"));
return;
}
while (true) {
++blockmax_loop_cnt;
bool ok = et_iter->Next();
if (!ok) [[unlikely]] {
break;
}
if constexpr (use_minimum_should_match) {
assert(minimum_should_match >= 2);
if (et_iter->MatchCount() < minimum_should_match) {
continue;
}
}
RowID id = et_iter->DocID();
float et_score = et_iter->BM25Score();
if (SHOULD_LOG_DEBUG()) {
Expand All @@ -219,30 +223,6 @@ void ExecuteFTSearchT(UniquePtr<DocIterator> &et_iter, FullTextScoreResultHeap &
}
}

void ExecuteFTSearch(UniquePtr<DocIterator> &et_iter,
FullTextScoreResultHeap &result_heap,
u32 &blockmax_loop_cnt,
const MinimumShouldMatchOption &minimum_should_match_option) {
// et_iter is nullptr if fulltext index is present but there's no data
if (et_iter == nullptr) {
LOG_DEBUG(fmt::format("et_iter is nullptr"));
return;
}
u32 minimum_should_match_val = 0;
if (!minimum_should_match_option.empty()) {
const auto leaf_count = et_iter->LeafCount();
minimum_should_match_val = GetMinimumShouldMatchParameter(minimum_should_match_option, leaf_count);
}
if (minimum_should_match_val <= 1) {
// no need for minimum_should_match
return ExecuteFTSearchT<false>(et_iter, result_heap, blockmax_loop_cnt, 0);
} else {
// now minimum_should_match_val >= 2
// use minimum_should_match
return ExecuteFTSearchT<true>(et_iter, result_heap, blockmax_loop_cnt, minimum_should_match_val);
}
}

#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-variable"
#pragma clang diagnostic ignored "-Wunused-but-set-variable"
Expand Down Expand Up @@ -310,13 +290,13 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
full_text_query_context.query_tree_ = MakeUnique<FilterQueryNode>(common_query_filter_.get(), std::move(query_tree_));

if (use_block_max_iter) {
et_iter = query_builder.CreateSearch(full_text_query_context, early_term_algo_);
et_iter = query_builder.CreateSearch(full_text_query_context, early_term_algo_, minimum_should_match_option_);
// et_iter is nullptr if fulltext index is present but there's no data
if (et_iter != nullptr)
et_iter->UpdateScoreThreshold(begin_threshold_);
}
if (use_ordinary_iter) {
doc_iterator = query_builder.CreateSearch(full_text_query_context, EarlyTermAlgo::kNaive);
doc_iterator = query_builder.CreateSearch(full_text_query_context, EarlyTermAlgo::kNaive, minimum_should_match_option_);
}

// 3 full text search
Expand All @@ -331,7 +311,7 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
#ifdef INFINITY_DEBUG
auto blockmax_begin_ts = std::chrono::high_resolution_clock::now();
#endif
ExecuteFTSearch(et_iter, result_heap, blockmax_loop_cnt, minimum_should_match_option_);
ExecuteFTSearch(et_iter, result_heap, blockmax_loop_cnt);
result_heap.Sort();
blockmax_result_count = result_heap.GetResultSize();
#ifdef INFINITY_DEBUG
Expand All @@ -346,7 +326,7 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
#ifdef INFINITY_DEBUG
auto ordinary_begin_ts = std::chrono::high_resolution_clock::now();
#endif
ExecuteFTSearch(doc_iterator, result_heap, ordinary_loop_cnt, minimum_should_match_option_);
ExecuteFTSearch(doc_iterator, result_heap, ordinary_loop_cnt);
result_heap.Sort();
ordinary_result_count = result_heap.GetResultSize();
#ifdef INFINITY_DEBUG
Expand Down Expand Up @@ -448,6 +428,10 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
}

operator_state->SetComplete();
ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager();
if (cache_result_ && cache_mgr != nullptr) {
AddCache(query_context, cache_mgr, output_data_blocks);
}
auto finish_output_time = std::chrono::high_resolution_clock::now();
TimeDurationType output_duration = finish_output_time - begin_output_time;
LOG_DEBUG(fmt::format("PhysicalMatch Part 5: Output data time: {} ms", output_duration.count()));
Expand All @@ -466,11 +450,12 @@ PhysicalMatch::PhysicalMatch(u64 id,
const SharedPtr<CommonQueryFilter> &common_query_filter,
MinimumShouldMatchOption &&minimum_should_match_option,
u64 match_table_index,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kMatch, nullptr, nullptr, id, std::move(load_metas)), table_index_(match_table_index),
SharedPtr<Vector<LoadMeta>> load_metas,
bool cache_result)
: PhysicalOperator(PhysicalOperatorType::kMatch, nullptr, nullptr, id, std::move(load_metas), cache_result), table_index_(match_table_index),
base_table_ref_(std::move(base_table_ref)), match_expr_(std::move(match_expr)), index_reader_(index_reader), query_tree_(std::move(query_tree)),
begin_threshold_(begin_threshold), early_term_algo_(early_term_algo), top_n_(top_n), common_query_filter_(common_query_filter),
minimum_should_match_option_(std::move(minimum_should_match_option)){}
minimum_should_match_option_(std::move(minimum_should_match_option)) {}

PhysicalMatch::~PhysicalMatch() = default;

Expand Down Expand Up @@ -531,4 +516,21 @@ String PhysicalMatch::ToString(i64 &space) const {
return res;
}

void PhysicalMatch::AddCache(QueryContext *query_context, ResultCacheManager *cache_mgr, const Vector<UniquePtr<DataBlock>> &output_data_blocks) {
Txn *txn = query_context->GetTxn();
TableEntry *table_entry = base_table_ref_->table_entry_ptr_;
TxnTimeStamp query_ts = std::min(txn->BeginTS(), table_entry->max_commit_ts());
Vector<UniquePtr<DataBlock>> data_blocks(output_data_blocks.size());
for (SizeT i = 0; i < output_data_blocks.size(); ++i) {
data_blocks[i] = output_data_blocks[i]->Clone();
}
auto cached_node = MakeUnique<CachedMatch>(query_ts, this);
bool success = cache_mgr->AddCache(std::move(cached_node), std::move(data_blocks));
if (!success) {
LOG_WARN(fmt::format("Add cache failed for query: {}", txn->BeginTS()));
} else {
LOG_INFO(fmt::format("Add cache success for query: {}", txn->BeginTS()));
}
}

} // namespace infinity
17 changes: 15 additions & 2 deletions src/executor/operator/physical_match.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import parse_fulltext_options;

namespace infinity {

class ResultCacheManager;
class DataBlock;

export class PhysicalMatch final : public PhysicalOperator {
public:
explicit PhysicalMatch(u64 id,
Expand All @@ -52,7 +55,8 @@ public:
const SharedPtr<CommonQueryFilter> &common_query_filter,
MinimumShouldMatchOption &&minimum_should_match_option,
u64 match_table_index,
SharedPtr<Vector<LoadMeta>> load_metas);
SharedPtr<Vector<LoadMeta>> load_metas,
bool cache_result);

~PhysicalMatch() override;

Expand Down Expand Up @@ -80,10 +84,19 @@ public:

[[nodiscard]] inline u64 table_index() const { return table_index_; }

[[nodiscard]] inline MatchExpression *match_expr() const { return match_expr_.get(); }
[[nodiscard]] inline const SharedPtr<MatchExpression> &match_expr() const { return match_expr_; }

SharedPtr<BaseExpression> filter_expression() const { return common_query_filter_->original_filter_; }

[[nodiscard]] inline const CommonQueryFilter *common_query_filter() const { return common_query_filter_.get(); }

const SharedPtr<BaseTableRef> &base_table_ref() const { return base_table_ref_; }

SizeT top_n() const { return top_n_; }

private:
void AddCache(QueryContext *query_context, ResultCacheManager *cache_mgr, const Vector<UniquePtr<DataBlock>> &output_data_blocks);

private:
u64 table_index_ = 0;
SharedPtr<BaseTableRef> base_table_ref_;
Expand Down
Loading

0 comments on commit e4387ac

Please sign in to comment.