Skip to content

Commit

Permalink
enhance: intermin index support index without raw data and metric typ…
Browse files Browse the repository at this point in the history
…e fp16/bf16

Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
  • Loading branch information
cqy123456 committed Jan 13, 2025
1 parent 39fa00f commit 476fb9c
Show file tree
Hide file tree
Showing 26 changed files with 590 additions and 169 deletions.
1 change: 1 addition & 0 deletions client/index/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
AUTOINDEX IndexType = "AUTOINDEX"
DISKANN IndexType = "DISKANN"
SCANN IndexType = "SCANN"
SCANNDVR IndexType = "SCANN_DVR"

// Sparse
SparseInverted IndexType = "SPARSE_INVERTED_INDEX"
Expand Down
5 changes: 4 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,11 @@ queryNode:
# Milvus will eventually seals and indexes all segments, but enabling this optimizes search performance for immediate queries following data insertion.
# This defaults to true, indicating that Milvus creates temporary index for growing segments and the sealed segments that are not indexed upon searches.
enableIndex: true
nlist: 128 # temp index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nlist: 128 # interim index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
subDim: 4 # interim index sub dim, recommend to (subDim % vector dim == 0)
refineRatio: 3.5 # interim index parameters, should set to be >= 1.0
withRawData: true # Whether to keep raw data inside the intermin index
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
multipleChunkedEnable: true # Enable multiple chunked search
Expand Down
24 changes: 24 additions & 0 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ VectorMemIndex<T>::VectorMemIndex(
}
}

template <typename T>
VectorMemIndex<T>::VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data)
: VectorIndex(index_type, metric_type) {
CheckMetricTypeSupport<T>(metric_type);
AssertInfo(!is_unsupported(index_type, metric_type),
index_type + " doesn't support metric: " + metric_type);

auto view_data_pack = knowhere::Pack(view_data);
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
GetIndexType(), version, view_data_pack);
if (get_index_obj.has_value()) {
index_ = get_index_obj.value();
} else {
auto err = get_index_obj.error();
if (err == knowhere::Status::invalid_index_error) {
PanicInfo(ErrorCode::Unsupported, get_index_obj.what());
}
PanicInfo(ErrorCode::KnowhereError, get_index_obj.what());
}
}

template <typename T>
knowhere::expected<std::vector<knowhere::IndexNode::IteratorPtr>>
VectorMemIndex<T>::VectorIterators(const milvus::DatasetPtr dataset,
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class VectorMemIndex : public VectorIndex {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());

// knowhere data view index special constucter for intermin index, no need to hold file_manager_ to upload or download files
VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const knowhere::ViewDataOp view_data);

BinarySet
Serialize(const Config& config) override;

Expand Down
97 changes: 71 additions & 26 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "common/Tracer.h"
#include "common/Types.h"
#include "google/protobuf/message_lite.h"
#include "index/VectorIndex.h"
#include "index/VectorMemIndex.h"
#include "mmap/ChunkedColumn.h"
#include "mmap/Utils.h"
Expand Down Expand Up @@ -115,7 +116,8 @@ ChunkedSegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
} else if (get_bit(binlog_index_bitset_, field_id)) {
}
if (get_bit(binlog_index_bitset_, field_id)) {
set_bit(binlog_index_bitset_, field_id, false);
vector_indexings_.drop_field_indexing(field_id);
}
Expand All @@ -136,8 +138,7 @@ ChunkedSegmentSealedImpl::WarmupChunkCache(const FieldId field_id,
auto& field_meta = schema_->operator[](field_id);
AssertInfo(field_meta.is_vector(), "vector field is not vector type");

if (!get_bit(index_ready_bitset_, field_id) &&
!get_bit(binlog_index_bitset_, field_id)) {
if (!get_bit(index_ready_bitset_, field_id)) {
return;
}

Expand Down Expand Up @@ -489,21 +490,13 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
insert_record_.seal_pks();
}

bool use_temp_index = false;
{
// update num_rows to build temperate binlog index
// update num_rows to build temperate intermin index
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}

if (generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
use_temp_index = true;
}

if (!use_temp_index) {
if (!generate_interim_index(field_id)) {
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, true);
}
Expand Down Expand Up @@ -1744,6 +1737,10 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
return fill_with_empty(field_id, count);
}

if (HasFieldData(field_id)) {
Assert(get_bit(field_data_ready_bitset_, field_id));
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!IsVectorDataType(field_meta.get_data_type())) {
Expand All @@ -1757,11 +1754,9 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
} else {
return fill_with_empty(field_id, count);
}

Assert(get_bit(field_data_ready_bitset_, field_id));

return get_raw_data(field_id, field_meta, seg_offsets, count);
}

std::unique_ptr<DataArray>
Expand Down Expand Up @@ -1818,15 +1813,22 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
auto fieldID = FieldId(field_id);
const auto& field_meta = schema_->operator[](fieldID);
if (IsVectorDataType(field_meta.get_data_type())) {
if (get_bit(index_ready_bitset_, fieldID) |
get_bit(binlog_index_bitset_, fieldID)) {
if (get_bit(index_ready_bitset_, fieldID)) {
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);
auto vec_index = dynamic_cast<index::VectorIndex*>(
field_indexing->indexing_.get());
return vec_index->HasRawData();
}
} else if (get_bit(binlog_index_bitset_, fieldID)) {
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);
auto vec_index =
dynamic_cast<index::VectorIndex*>(field_indexing->indexing_.get());
return vec_index->HasRawData() ||
get_bit(field_data_ready_bitset_, fieldID);
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {
Expand Down Expand Up @@ -2017,6 +2019,8 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
}
// check data type
if (field_meta.get_data_type() != DataType::VECTOR_FLOAT &&
field_meta.get_data_type() != DataType::VECTOR_FLOAT16 &&
field_meta.get_data_type() != DataType::VECTOR_BFLOAT16 &&
!is_sparse) {
return false;
}
Expand Down Expand Up @@ -2062,16 +2066,50 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
is_sparse
? dynamic_cast<ChunkedSparseFloatColumn*>(vec_data.get())->Dim()
: field_meta.get_dim();
auto index_metric = field_binlog_config->GetMetricType();
std::unique_ptr<index::VectorIndex> vec_index = nullptr;
if (!is_sparse) {
knowhere::ViewDataOp view_data = [field_raw_data_ptr =
vec_data](size_t id) {
return field_raw_data_ptr->ValueAt(id);
};
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
} else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) {
vec_index =
std::make_unique<index::VectorMemIndex<knowhere::fp16>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
} else if (field_meta.get_data_type() ==
DataType::VECTOR_BFLOAT16) {
vec_index =
std::make_unique<index::VectorMemIndex<knowhere::bf16>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
view_data);
}
} else {
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
}
if (vec_index == nullptr) {
LOG_INFO("fail to generate intermin index, invalid data type.");
return false;
}

auto build_config = field_binlog_config->GetBuildBaseParams();
build_config[knowhere::meta::DIM] = std::to_string(dim);
build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1);
auto index_metric = field_binlog_config->GetMetricType();

auto vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
auto num_chunk = vec_data->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto dataset = knowhere::GenDataSet(
Expand All @@ -2088,19 +2126,26 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {

if (enable_binlog_index()) {
std::unique_lock lck(mutex_);
if (vec_index->HasRawData()) {
// some knowhere view data index not has raw data, still keep it
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
} else {
set_bit(field_data_ready_bitset_, field_id, true);
}
vector_indexings_.append_field_indexing(
field_id, index_metric, std::move(vec_index));

vec_binlog_config_[field_id] = std::move(field_binlog_config);
set_bit(binlog_index_bitset_, field_id, true);
LOG_INFO(
"replace binlog with binlog index in segment {}, field {}.",
"replace binlog with intermin index in segment {}, field {}.",
this->get_segment_id(),
field_id.get());
}
return true;
} catch (std::exception& e) {
LOG_WARN("fail to generate binlog index, because {}", e.what());
LOG_WARN("fail to generate intermin index, because {}", e.what());
return false;
}
}
Expand Down
13 changes: 13 additions & 0 deletions internal/core/src/segcore/ConcurrentVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,17 @@ class ConcurrentVector<BFloat16Vector>
}
};

static bool
ConcurrentDenseVectorCheck(const VectorBase* vec_base, DataType data_type) {
if (data_type == DataType::VECTOR_FLOAT) {
return dynamic_cast<const ConcurrentVector<FloatVector>*>(vec_base);
} else if (data_type == DataType::VECTOR_FLOAT16) {
return dynamic_cast<const ConcurrentVector<Float16Vector>*>(vec_base);
} else if (data_type == DataType::VECTOR_BFLOAT16) {
return dynamic_cast<const ConcurrentVector<BFloat16Vector>*>(vec_base);
} else {
return false;
}
}

} // namespace milvus::segcore
Loading

0 comments on commit 476fb9c

Please sign in to comment.