Skip to content

Commit

Permalink
Fix some.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Oct 21, 2024
1 parent 25e6e52 commit 203befe
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 28 deletions.
8 changes: 5 additions & 3 deletions src/executor/operator/physical_fusion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,6 @@ void PhysicalFusion::ExecuteRRFWeighted(const Map<u64, Vector<UniquePtr<DataBloc
else
child_op = other_children_[i - 2].get();
auto child_type = child_op->operator_type();
if (child_type == PhysicalOperatorType::kReadCache) {
child_type = static_cast<PhysicalReadCache *>(child_op)->origin_type();
}
switch (child_type) {
case PhysicalOperatorType::kKnnScan: {
PhysicalKnnScan *phy_knn_scan = static_cast<PhysicalKnnScan *>(child_op);
Expand All @@ -254,6 +251,11 @@ void PhysicalFusion::ExecuteRRFWeighted(const Map<u64, Vector<UniquePtr<DataBloc
min_heaps[i] = true;
break;
}
case PhysicalOperatorType::kReadCache: {
PhysicalReadCache *phy_read_cache = static_cast<PhysicalReadCache *>(child_op);
min_heaps[i] = phy_read_cache->is_min_heap();
break;
}
default: {
String error_message = fmt::format("Cannot determine heap type of operator {}", int(child_op->operator_type()));
UnrecoverableError(error_message);
Expand Down
5 changes: 3 additions & 2 deletions src/executor/operator/physical_read_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ PhysicalReadCache::PhysicalReadCache(u64 id,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<CacheContent> cache_content,
Vector<SizeT> column_map,
SharedPtr<Vector<LoadMeta>> load_metas)
SharedPtr<Vector<LoadMeta>> load_metas,
bool is_min_heap)
: PhysicalOperator(PhysicalOperatorType::kReadCache, nullptr, nullptr, id, load_metas), base_table_ref_(base_table_ref),
cache_content_(cache_content), column_map_(column_map) {
cache_content_(cache_content), column_map_(column_map), is_min_heap_(is_min_heap) {
switch (origin_type) {
case LogicalNodeType::kMatch: {
origin_type_ = PhysicalOperatorType::kMatch;
Expand Down
6 changes: 5 additions & 1 deletion src/executor/operator/physical_read_cache.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public:
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<CacheContent> cache_content,
Vector<SizeT> column_map,
SharedPtr<Vector<LoadMeta>> load_metas);
SharedPtr<Vector<LoadMeta>> load_metas,
bool is_min_heap);

void Init() override {};

Expand All @@ -54,13 +55,16 @@ public:

PhysicalOperatorType origin_type() const { return origin_type_; }

bool is_min_heap() const { return is_min_heap_; }

private:
SharedPtr<BaseTableRef> base_table_ref_;

SharedPtr<CacheContent> cache_content_;

PhysicalOperatorType origin_type_;
Vector<SizeT> column_map_; // result column id -> cache column id
bool is_min_heap_;
};

} // namespace infinity
3 changes: 2 additions & 1 deletion src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,8 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildReadCache(const SharedPtr<Logi
logical_read_cache->base_table_ref_,
logical_read_cache->cache_content_,
logical_read_cache->column_map_,
logical_read_cache->load_metas());
logical_read_cache->load_metas(),
logical_read_cache->is_min_heap_);
}

UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<LogicalNode> &logical_operator) const {
Expand Down
2 changes: 1 addition & 1 deletion src/expression/cast_expression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ bool CastExpression::Eq(const BaseExpression &other_base) const {
return false;
}
const auto &other = static_cast<const CastExpression &>(other_base);
return func_.function == other.func_.function && target_type_ == other.target_type_;
return func_.function == other.func_.function && arguments_[0]->Eq(*other.arguments_[0]);
}

} // namespace infinity
3 changes: 2 additions & 1 deletion src/expression/value_expression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ bool ValueExpression::Eq(const BaseExpression &other_base) const {
return false;
}
const auto &other = static_cast<const ValueExpression &>(other_base);
return value_ == other.value_; }
return value_ == other.value_;
}

} // namespace infinity
2 changes: 2 additions & 0 deletions src/planner/cached_node/cached_match_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public:

bool Eq(const CachedNodeBase &other) const override;

const BaseExpression *query_expression() const { return query_expression_.get(); }

private:
SharedPtr<BaseExpression> query_expression_{};
SharedPtr<BaseExpression> filter_expression_{};
Expand Down
5 changes: 3 additions & 2 deletions src/planner/node/logical_read_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ LogicalReadCache::LogicalReadCache(u64 node_id,
LogicalNodeType origin_type,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<CacheContent> cache_content,
Vector<SizeT> column_map)
Vector<SizeT> column_map,
bool is_min_heap)
: LogicalNode(node_id, LogicalNodeType::kReadCache), origin_type_(origin_type), base_table_ref_(std::move(base_table_ref)),
cache_content_(std::move(cache_content)), column_map_(std::move(column_map)) {}
cache_content_(std::move(cache_content)), column_map_(std::move(column_map)), is_min_heap_(is_min_heap) {}

Vector<ColumnBinding> LogicalReadCache::GetColumnBindings() const {
Vector<ColumnBinding> result;
Expand Down
4 changes: 3 additions & 1 deletion src/planner/node/logical_read_cache.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public:
LogicalNodeType origin_type,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<CacheContent> cache_content,
Vector<SizeT> column_map);
Vector<SizeT> column_map,
bool is_min_heap);

public:
virtual Vector<ColumnBinding> GetColumnBindings() const;
Expand All @@ -52,6 +53,7 @@ public:
SharedPtr<BaseTableRef> base_table_ref_;
SharedPtr<CacheContent> cache_content_;
Vector<SizeT> column_map_; // result column id -> cache column id
bool is_min_heap_;
};

} // namespace infinity
9 changes: 8 additions & 1 deletion src/planner/optimizer/result_cache_getter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import cached_index_scan;
import third_party;
import logger;
import base_table_ref;
import knn_expression;

namespace infinity {

Expand All @@ -48,13 +49,15 @@ void ResultCacheGetter::ApplyToPlan(QueryContext *query_context_ptr, SharedPtr<L
}
Optional<CacheOutput> cache_output;
SharedPtr<BaseTableRef> base_table_ref;
bool is_min_heap = false;
switch (op->operator_type()) {
case LogicalNodeType::kMatch: {
auto *logical_match = static_cast<LogicalMatch *>(op.get());
base_table_ref = logical_match->base_table_ref_;
TxnTimeStamp query_ts = std::min(begin_ts, logical_match->base_table_ref_->max_commit_ts());
CachedMatch cached_match(query_ts, logical_match);
cache_output = cache_mgr->GetCache(cached_match);
is_min_heap = true;
break;
}
case LogicalNodeType::kMatchTensorScan: {
Expand All @@ -63,6 +66,7 @@ void ResultCacheGetter::ApplyToPlan(QueryContext *query_context_ptr, SharedPtr<L
TxnTimeStamp query_ts = std::min(begin_ts, logical_match_tensor_scan->base_table_ref_->max_commit_ts());
CachedMatchTensorScan cached_match_tensor_scan(query_ts, logical_match_tensor_scan);
cache_output = cache_mgr->GetCache(cached_match_tensor_scan);
is_min_heap = true;
break;
}
case LogicalNodeType::kMatchSparseScan: {
Expand All @@ -71,6 +75,7 @@ void ResultCacheGetter::ApplyToPlan(QueryContext *query_context_ptr, SharedPtr<L
TxnTimeStamp query_ts = std::min(begin_ts, logical_match_sparse_scan->base_table_ref_->max_commit_ts());
CachedMatchSparseScan cached_match_sparse_scan(query_ts, logical_match_sparse_scan);
cache_output = cache_mgr->GetCache(cached_match_sparse_scan);
is_min_heap = true;
break;
}
case LogicalNodeType::kKnnScan: {
Expand All @@ -79,6 +84,7 @@ void ResultCacheGetter::ApplyToPlan(QueryContext *query_context_ptr, SharedPtr<L
TxnTimeStamp query_ts = std::min(begin_ts, logical_knn_scan->base_table_ref_->max_commit_ts());
CachedKnnScan cached_knn_scan(query_ts, logical_knn_scan);
cache_output = cache_mgr->GetCache(cached_knn_scan);
is_min_heap = static_cast<const KnnExpression *>(cached_knn_scan.query_expression())->IsKnnMinHeap();
break;
}
case LogicalNodeType::kIndexScan: {
Expand All @@ -101,7 +107,8 @@ void ResultCacheGetter::ApplyToPlan(QueryContext *query_context_ptr, SharedPtr<L
op->operator_type(),
base_table_ref,
std::move(cache_output->cache_content_),
std::move(cache_output->column_map_));
std::move(cache_output->column_map_),
is_min_heap);
logical_read_cache->set_left_node(op->left_node());
logical_read_cache->set_right_node(op->right_node());
op = logical_read_cache;
Expand Down
3 changes: 3 additions & 0 deletions src/scheduler/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,7 @@ SharedPtr<DataTable> SerialMaterializedFragmentCtx::GetResultInternal() {
result_table->UpdateRowCount(data_block->row_count());
result_table->data_blocks_.emplace_back(std::move(data_block));
}
materialize_sink_state->data_block_array_.clear();
// result_table->data_blocks_ = std::move(materialize_sink_state->data_block_array_);
return result_table;
}
Expand Down Expand Up @@ -1528,6 +1529,7 @@ SharedPtr<DataTable> ParallelMaterializedFragmentCtx::GetResultInternal() {
for (auto &result_data_block : materialize_sink_state->data_block_array_) {
result_table->Append(std::move(result_data_block));
}
materialize_sink_state->data_block_array_.clear();
}

return result_table;
Expand Down Expand Up @@ -1577,6 +1579,7 @@ SharedPtr<DataTable> ParallelStreamFragmentCtx::GetResultInternal() {
for (auto &result_data_block : materialize_sink_state->data_block_array_) {
result_table->Append(std::move(result_data_block));
}
materialize_sink_state->data_block_array_.clear();
}

return result_table;
Expand Down
8 changes: 7 additions & 1 deletion test/sql/dql/index_scan/index_scan_str.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
SET CONFIG result_cache_mode "off";

statement ok
DROP TABLE IF EXISTS str_index_scan_insert;

Expand Down Expand Up @@ -53,4 +56,7 @@ PROJECT (4)
- output_columns: [__rowid]

statement ok
DROP TABLE str_index_scan_insert;
DROP TABLE str_index_scan_insert;

statement ok
SET CONFIG result_cache_mode "on";
28 changes: 14 additions & 14 deletions test/sql/dql/result_cache/cached_knn_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ statement ok
COPY cached_knn_scan FROM '/var/infinity/test_data/embedding_float_dim4.csv' WITH (DELIMITER ',', FORMAT CSV);

query I
EXPLAIN SELECT c1 FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
EXPLAIN SELECT c1, Distance() FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
----
PROJECT (3)
- table index: #4
- expressions: [c1 (#0)]
- expressions: [c1 (#0), DISTANCE (#1)]
-> KNN SCAN (2)
- table name: cached_knn_scan(default_db.cached_knn_scan)
- table index: #4
Expand All @@ -24,35 +24,35 @@ EXPLAIN SELECT c1 FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2,
- output columns: [c1, __score, __rowid]

query I
SELECT c1 FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
SELECT c1, Distance() FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
----
8
6
4
8 0.020000
6 0.060000
4 0.100000

query I
EXPLAIN SELECT c1 FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
EXPLAIN SELECT c1, Distance() FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
----
PROJECT (3)
- table index: #4
- expressions: [c1 (#0)]
- expressions: [c1 (#0), DISTANCE (#1)]
-> Read cache (2)
- table name: (default_db.cached_knn_scan)
- output columns: [c1, __score, __rowid]

query I
SELECT c1 FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
SELECT c1, Distance() FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.2], 'float', 'l2', 3);
----
8
6
4
8 0.020000
6 0.060000
4 0.100000

query I
EXPLAIN SELECT c1 FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.1], 'float', 'l2', 3);
EXPLAIN SELECT c1, Distance() FROM cached_knn_scan SEARCH MATCH VECTOR (c2, [0.3, 0.3, 0.2, 0.1], 'float', 'l2', 3);
----
PROJECT (3)
- table index: #4
- expressions: [c1 (#0)]
- expressions: [c1 (#0), DISTANCE (#1)]
-> KNN SCAN (2)
- table name: cached_knn_scan(default_db.cached_knn_scan)
- table index: #4
Expand Down

0 comments on commit 203befe

Please sign in to comment.