Skip to content

Commit

Permalink
Dump bmp index (#2367)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Dump BMP Index by memory usage.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases

---------

Co-authored-by: shen yushi <shenyushi99@qq.com>
  • Loading branch information
vsian and small-turtle-1 authored Dec 13, 2024
1 parent de0f9b3 commit 349d881
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 32 deletions.
108 changes: 108 additions & 0 deletions python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import pathlib
from infinity.common import ConflictType, SparseVector
import pytest


class TestMemIdx:
Expand Down Expand Up @@ -339,6 +340,113 @@ def check(rows):

part3()

# @pytest.mark.skip(reason="bug")
def test_mem_bmp(self, infinity_runner: InfinityRunner):
config1 = "test/data/config/restart_test/test_memidx/1.toml"
config2 = "test/data/config/restart_test/test_memidx/2.toml"
config3 = "test/data/config/restart_test/test_memidx/3.toml"
uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

test_data = [
{"c1" : 1, "c2" : SparseVector(indices=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90], values=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0])},
{"c1" : 2, "c2" : SparseVector(indices=[0, 20, 40, 60, 80], values=[2.0, 2.0, 2.0, 2.0, 2.0])},
{"c1" : 3, "c2" : SparseVector(indices=[0, 30, 60, 90], values=[3.0, 3.0, 3.0, 3.0])},
{"c1" : 4, "c2" : SparseVector(indices=[0, 40, 80], values=[4.0, 4.0, 4.0])},
{"c1" : 5, "c2" : SparseVector(indices=[0], values=[0.0])},
]
query_vector = SparseVector(indices=[0, 20, 80], values=[1.0, 2.0, 3.0])

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.create_table(
"test_mem_bmp",
{"c1": {"type": "int"}, "c2": {"type": "sparse,100,float,int"}},
)
res = table_obj.create_index(
"idx1",
index.IndexInfo(
"c2",
index.IndexType.BMP,
{"BLOCK_SIZE": "8", "COMPRESS_TYPE": "compress"},
),
)
assert res.error_code == infinity.ErrorCode.OK

# trigger dump
for i in range(7):
table_obj.insert(test_data)

part1()

# config1 can hold 51 rows of ivf mem index before dump
# 1. recover by dumpindex wal & memindex recovery
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

@decorator2
def part2(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_bmp")
data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [35]

data_dict, data_type_dict, _ = (
table_obj.output(["c1"])
.match_sparse("c2", query_vector, "ip", 8)
.to_result()
)
assert data_dict["c1"] == [4, 4, 4, 4, 4, 4, 4, 2]

data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [35]

for i in range(3):
table_obj.insert(test_data)
time.sleep(5)

data_dict, data_type_dict, _ = (
table_obj.output(["c1"])
.match_sparse("c2", query_vector, "ip", 11)
.to_result()
)
assert data_dict["c1"] == [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 2]

part2()

# 2. recover by delta ckp & dumpindex wal & memindex recovery
decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner)

@decorator3
def part3(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_bmp")

def check():
data_dict, data_type_dict, _ = (
table_obj.output(["c1"])
.match_sparse("c2", query_vector, "ip", 11)
.to_result()
)
assert data_dict["c1"] == [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 2]

data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result()
assert data_dict["count(star)"] == [50]

check()
infinity_obj.optimize("default_db", "test_mem_bmp", optimize_opt=None)
check()

db_obj.drop_table("test_mem_bmp")

part3()

def test_optimize_from_different_database(self, infinity_runner: InfinityRunner):
infinity_runner.clear()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,17 @@ void PhysicalMatchSparseScan::ExecuteInnerT(DistFunc *dist_func,
const auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetBMPIndexSnapshot();
for (SizeT query_id = 0; query_id < query_n; ++query_id) {
for (auto chunk_index_entry : chunk_index_entries) {
if (!chunk_index_entry->CheckVisible(txn)) {
continue;
}
BufferHandle buffer_handle = chunk_index_entry->GetIndex();
const auto *bmp_index = reinterpret_cast<const AbstractBMP *>(buffer_handle.GetData());
bmp_search(*bmp_index, query_id, false, filter);
LOG_TRACE(fmt::format("Search Match Sparse in chunk {}", chunk_index_entry->encode()));
}
if (memory_index_entry.get() != nullptr) {
bmp_search(memory_index_entry->get(), query_id, true, filter);
LOG_TRACE(fmt::format("Search Match Sparse in mem index of {}", segment_index_entry->encode()));
}
}
};
Expand Down
6 changes: 5 additions & 1 deletion src/storage/knn_index/knn_ivf/ivf_index_data_in_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ class IVFIndexInMemT final : public IVFIndexInMem {
}
}

~IVFIndexInMemT() { BaseMemIndex::DecreaseMemoryUsageBase(MemoryUsed()); }
~IVFIndexInMemT() {
if (own_ivf_index_storage_) {
DecreaseMemoryUsageBase(MemoryUsed());
}
}

MemIndexTracerInfo GetInfo() const override {
auto *table_index_entry = segment_index_entry_->table_index_entry();
Expand Down
45 changes: 39 additions & 6 deletions src/storage/knn_index/sparse/abstract_bmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,35 @@ import sparse_util;
import segment_iter;
import segment_entry;
import infinity_exception;
import third_party;
import logger;

namespace infinity {

BMPIndexInMem::BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def)
: begin_row_id_(begin_row_id), bmp_(InitAbstractIndex(index_base, column_def)) {
MemIndexTracerInfo BMPIndexInMem::GetInfo() const {
auto *table_index_entry = segment_index_entry_->table_index_entry();
SharedPtr<String> index_name = table_index_entry->GetIndexName();
auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry();
SharedPtr<String> table_name = table_entry->GetTableName();
SharedPtr<String> db_name = table_entry->GetDBName();

auto [mem_used, row_cnt] = std::visit(
[](auto &&index) -> Pair<SizeT, SizeT> {
using T = std::decay_t<decltype(index)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return {};
} else {
return {index->MemoryUsage(), index->DocNum()};
}
},
bmp_);
return MemIndexTracerInfo(index_name, table_name, db_name, mem_used, row_cnt);
}

TableIndexEntry *BMPIndexInMem::table_index_entry() const { return segment_index_entry_->table_index_entry(); }

BMPIndexInMem::BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def, SegmentIndexEntry *segment_index_entry)
: begin_row_id_(begin_row_id), bmp_(InitAbstractIndex(index_base, column_def)), segment_index_entry_(segment_index_entry) {
const auto *index_bmp = static_cast<const IndexBMP *>(index_base);
const auto *sparse_info = static_cast<SparseInfo *>(column_def->type()->type_info().get());
SizeT term_num = sparse_info->Dimension();
Expand Down Expand Up @@ -68,14 +92,16 @@ BMPIndexInMem::~BMPIndexInMem() {
return;
}
std::visit(
[](auto &&index) {
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return;
} else {
SizeT mem_used = index->MemoryUsage();
if (index != nullptr) {
delete index;
}
DecreaseMemoryUsageBase(mem_used);
}
},
bmp_);
Expand All @@ -94,6 +120,7 @@ SizeT BMPIndexInMem::GetRowCount() const {
bmp_);
}

// realtime insert, trace this
void BMPIndexInMem::AddDocs(SizeT block_offset, BlockColumnEntry *block_column_entry, BufferManager *buffer_mgr, SizeT row_offset, SizeT row_count) {
std::visit(
[&](auto &&index) {
Expand All @@ -103,9 +130,12 @@ void BMPIndexInMem::AddDocs(SizeT block_offset, BlockColumnEntry *block_column_e
} else {
using IndexT = std::decay_t<decltype(*index)>;
using SparseRefT = SparseVecRef<typename IndexT::DataT, typename IndexT::IdxT>;

SizeT mem_before = index->MemoryUsage();
MemIndexInserterIter<SparseRefT> iter(block_offset, block_column_entry, buffer_mgr, row_offset, row_count);
index->AddDocs(std::move(iter));
SizeT mem_after = index->MemoryUsage();
IncreaseMemoryUsageBase(mem_after - mem_before);
LOG_INFO(fmt::format("before : {} -> after : {}, add mem_used : {}", mem_before, mem_after, mem_after - mem_before));
}
},
bmp_);
Expand Down Expand Up @@ -133,7 +163,7 @@ void BMPIndexInMem::AddDocs(const SegmentEntry *segment_entry, BufferManager *bu
bmp_);
}

SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const {
SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr, SizeT *dump_size) {
if (!own_memory_) {
UnrecoverableError("BMPIndexInMem::Dump() called with own_memory_ = false.");
}
Expand All @@ -147,6 +177,9 @@ SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_
} else {
row_count = index->DocNum();
index_size = index->GetSizeInBytes();
if (dump_size != nullptr) {
*dump_size = index->MemoryUsage();
}
}
},
bmp_);
Expand All @@ -160,4 +193,4 @@ SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_
return new_chunk_index_entry;
}

} // namespace infinity
} // namespace infinity
16 changes: 12 additions & 4 deletions src/storage/knn_index/sparse/abstract_bmp.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import index_bmp;
import sparse_info;
import internal_types;
import buffer_handle;
import base_memindex;
import memindex_tracer;
import table_index_entry;

namespace infinity {

Expand All @@ -52,11 +55,15 @@ export using AbstractBMP = std::variant<BMPAlg<f32, i32, BMPCompressType::kCompr
BMPAlg<f64, i8, BMPCompressType::kRaw> *,
std::nullptr_t>;

export struct BMPIndexInMem {
export struct BMPIndexInMem final : public BaseMemIndex {
public:
BMPIndexInMem() : bmp_(nullptr) {}

BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def);
BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def, SegmentIndexEntry *segment_index_entry);

MemIndexTracerInfo GetInfo() const override;

TableIndexEntry *table_index_entry() const override;

private:
template <typename DataType, typename IndexType>
Expand Down Expand Up @@ -112,13 +119,14 @@ public:

AbstractBMP &get_ref() { return bmp_; }

SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const ;
SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr, SizeT *dump_size = nullptr);

private:
RowID begin_row_id_ = {};
AbstractBMP bmp_ = nullptr;
mutable bool own_memory_ = true;
mutable BufferHandle chunk_handle_{};
SegmentIndexEntry *segment_index_entry_;
};

} // namespace infinity
} // namespace infinity
19 changes: 13 additions & 6 deletions src/storage/knn_index/sparse/bmp_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

module;

#include "common/simd/simd_common_intrin_include.h"
#include <algorithm>
#include <vector>
#include "common/simd/simd_common_intrin_include.h"

module bmp_alg;

Expand All @@ -25,12 +25,15 @@ import third_party;
import serialize;
import segment_iter;
import bp_reordering;
import bmp_blockterms;

namespace infinity {

template <typename DataType, BMPCompressType CompressType>
template <typename IdxType>
void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id, const Vector<Pair<Vector<IdxType>, Vector<DataType>>> &tail_terms) {
void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id,
const Vector<Pair<Vector<IdxType>, Vector<DataType>>> &tail_terms,
SizeT &mem_usage) {
HashMap<IdxType, DataType> max_scores;
for (const auto &[indices, data] : tail_terms) {
SizeT block_size = indices.size();
Expand All @@ -41,7 +44,7 @@ void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id, const Vector<
}
}
for (const auto &[term_id, score] : max_scores) {
postings_[term_id].data_.AddBlock(block_id, score);
postings_[term_id].data_.AddBlock(block_id, score, mem_usage);
}
}

Expand Down Expand Up @@ -144,7 +147,7 @@ template class TailFwd<f64, i16>;
template class TailFwd<f64, i8>;

template <typename DataType, typename IdxType>
Optional<TailFwd<DataType, IdxType>> BlockFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc) {
Optional<TailFwd<DataType, IdxType>> BlockFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc, SizeT &mem_usage) {
SizeT tail_size = tail_fwd_.AddDoc(doc);
if (tail_size < block_size_) {
return None;
Expand All @@ -154,6 +157,7 @@ Optional<TailFwd<DataType, IdxType>> BlockFwd<DataType, IdxType>::AddDoc(const S

Vector<Tuple<IdxType, Vector<BMPBlockOffset>, Vector<DataType>>> block_terms = tail_fwd1.ToBlockFwd();
block_terms_list_.emplace_back(block_terms);
mem_usage += block_terms_list_.back().GetSizeInBytes();
return tail_fwd1;
}

Expand Down Expand Up @@ -242,14 +246,17 @@ void BMPAlg<DataType, IdxType, CompressType>::AddDoc(const SparseVecRef<DataType
lock = std::unique_lock(mtx_);
}

SizeT mem_usage = 0;
doc_ids_.push_back(doc_id);
Optional<TailFwd<DataType, IdxType>> tail_fwd = block_fwd_.AddDoc(doc);
Optional<TailFwd<DataType, IdxType>> tail_fwd = block_fwd_.AddDoc(doc, mem_usage);
if (!tail_fwd.has_value()) {
mem_usage_.fetch_add(sizeof(BMPDocID) + mem_usage);
return;
}
BMPBlockID block_id = block_fwd_.block_num() - 1;
const auto &tail_terms = tail_fwd->GetTailTerms();
bm_ivt_.AddBlock(block_id, tail_terms);
bm_ivt_.AddBlock(block_id, tail_terms, mem_usage);
mem_usage_.fetch_add(sizeof(BMPDocID) + mem_usage);
}

template <typename DataType, typename IdxType, BMPCompressType CompressType>
Expand Down
Loading

0 comments on commit 349d881

Please sign in to comment.