Skip to content

Commit

Permalink
storage: Refine local index for more index kinds (#9774)
Browse files Browse the repository at this point in the history
ref #9032

Signed-off-by: Wish <breezewish@outlook.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
breezewish and ti-chi-bot[bot] authored Jan 20, 2025
1 parent f56fb91 commit a381c31
Show file tree
Hide file tree
Showing 46 changed files with 703 additions and 410 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace DB
M(force_wait_index_timeout) \
M(force_local_index_task_memory_limit_exceeded) \
M(exception_build_local_index_for_file) \
M(force_not_support_vector_index) \
M(force_not_support_local_index) \
M(sync_schema_request_failure)

#define APPLY_FOR_FAILPOINTS(M) \
Expand Down
88 changes: 70 additions & 18 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,45 @@
namespace DB::DM
{

namespace details
{

inline dtpb::ColumnFileIndexInfo migrateFromIndexInfoV1(const dtpb::ColumnFileIndexInfo & index_pb)
{
RUNTIME_CHECK(index_pb.has_deprecated_vector_index());

auto idx_info = dtpb::ColumnFileIndexInfo{};
idx_info.set_index_page_id(index_pb.index_page_id());
auto * idx_props = idx_info.mutable_index_props();
idx_props->set_kind(dtpb::IndexFileKind::VECTOR_INDEX);
idx_props->set_index_id(index_pb.deprecated_vector_index().index_id());
idx_props->set_file_size(index_pb.deprecated_vector_index().index_bytes());
auto * vec_idx = idx_props->mutable_vector_index();
vec_idx->set_format_version(0);
vec_idx->set_dimensions(index_pb.deprecated_vector_index().dimensions());
vec_idx->set_distance_metric(index_pb.deprecated_vector_index().distance_metric());
return idx_info;
}

inline void integrityCheckIndexInfoV2(const dtpb::ColumnFileIndexInfo & index_info)
{
RUNTIME_CHECK(index_info.has_index_page_id());
RUNTIME_CHECK(index_info.index_props().has_file_size());
RUNTIME_CHECK(index_info.index_props().has_index_id());
RUNTIME_CHECK(index_info.index_props().has_kind());
switch (index_info.index_props().kind())
{
case dtpb::IndexFileKind::VECTOR_INDEX:
RUNTIME_CHECK(index_info.index_props().has_vector_index());
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index_info.index_props().kind()));
}
}

} // namespace details


ColumnFileReaderPtr ColumnFileTiny::getReader(
const DMContext &,
const IColumnFileDataProviderPtr & data_provider,
Expand Down Expand Up @@ -61,10 +100,12 @@ void ColumnFileTiny::serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool s

for (const auto & index_info : *index_infos)
{
// Just some integrity checks to ensure we are writing correct data.
// These data may come from deserialization, or generated in runtime.
details::integrityCheckIndexInfoV2(index_info);

auto * index_pb = tiny_pb->add_indexes();
index_pb->set_index_page_id(index_info.index_page_id);
if (index_info.vector_index.has_value())
index_pb->mutable_vector_index()->CopyFrom(*index_info.vector_index);
index_pb->CopyFrom(index_info);
}
}

Expand Down Expand Up @@ -121,10 +162,16 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
// Old format compatibility
if unlikely (index_pb.has_deprecated_vector_index())
{
auto idx_info = details::migrateFromIndexInfoV1(index_pb);
index_infos->emplace_back(std::move(idx_info));
continue;
}

details::integrityCheckIndexInfoV2(index_pb);
index_infos->emplace_back(index_pb);
}

return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context, index_infos);
Expand Down Expand Up @@ -178,13 +225,13 @@ ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(

// Write index data page to local ps
auto new_index_infos = std::make_shared<IndexInfos>();
for (const auto & index : *index_infos)
for (const auto & index_info : *index_infos)
{
auto new_index_page_id = put_remote_page(index.index_page_id);
if (index.vector_index)
new_index_infos->emplace_back(new_index_page_id, index.vector_index);
else
new_index_infos->emplace_back(new_index_page_id, std::nullopt);
details::integrityCheckIndexInfoV2(index_info);
auto new_index_page_id = put_remote_page(index_info.index_page_id());
auto new_index_info = index_info;
new_index_info.set_index_page_id(new_index_page_id);
new_index_infos->emplace_back(std::move(new_index_info));
}
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context, new_index_infos);
}
Expand Down Expand Up @@ -235,10 +282,15 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
// Old format compatibility
if unlikely (index_pb.has_deprecated_vector_index())
{
auto idx_info = details::migrateFromIndexInfoV1(index_pb);
index_infos->emplace_back(std::move(idx_info));
continue;
}

index_infos->emplace_back(index_pb);
}

return {
Expand Down Expand Up @@ -343,7 +395,7 @@ void ColumnFileTiny::removeData(WriteBatches & wbs) const
if (index_infos)
{
for (const auto & index_info : *index_infos)
wbs.removed_log.delPage(index_info.index_page_id);
wbs.removed_log.delPage(index_info.index_page_id());
}
}

Expand Down
19 changes: 3 additions & 16 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>
#include <Storages/DeltaMerge/dtpb/column_file.pb.h>
#include <Storages/DeltaMerge/dtpb/vector_index.pb.h>
#include <Storages/Page/PageStorage_fwd.h>

namespace DB::DM
Expand All @@ -37,21 +36,11 @@ class ColumnFileTiny : public ColumnFilePersisted
{
public:
friend class ColumnFileTinyReader;
friend class ColumnFileTinyVectorIndexWriter;
friend class ColumnFileTinyLocalIndexWriter;
friend class ColumnFileTinyVectorIndexReader;
friend struct Remote::Serializer;

struct IndexInfo
{
IndexInfo(PageIdU64 page_id, std::optional<dtpb::VectorIndexFileProps> vec_index)
: index_page_id(page_id)
, vector_index(vec_index)
{}

PageIdU64 index_page_id{};
std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;
};
using IndexInfos = std::vector<IndexInfo>;
using IndexInfos = std::vector<dtpb::ColumnFileIndexInfo>;
using IndexInfosPtr = std::shared_ptr<IndexInfos>;

private:
Expand Down Expand Up @@ -99,9 +88,7 @@ class ColumnFileTiny : public ColumnFilePersisted
if (!index_infos)
return false;
return std::any_of(index_infos->cbegin(), index_infos->cend(), [index_id](const auto & info) {
if (!info.vector_index)
return false;
return info.vector_index->index_id() == index_id;
return info.index_props().index_id() == index_id;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include <IO/Compression/CompressedWriteBuffer.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyLocalIndexWriter.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>

Expand All @@ -28,7 +28,7 @@ extern const int ABORTED;
namespace DB::DM
{

ColumnFileTinyVectorIndexWriter::LocalIndexBuildInfo ColumnFileTinyVectorIndexWriter::getLocalIndexBuildInfo(
ColumnFileTinyLocalIndexWriter::LocalIndexBuildInfo ColumnFileTinyLocalIndexWriter::getLocalIndexBuildInfo(
const LocalIndexInfosSnapshot & index_infos,
const ColumnFilePersistedSetPtr & file_set)
{
Expand Down Expand Up @@ -78,7 +78,7 @@ ColumnFileTinyVectorIndexWriter::LocalIndexBuildInfo ColumnFileTinyVectorIndexWr
return build;
}

ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
const ColumnDefines & column_defines,
const ColumnDefine & del_cd,
const ColumnFileTiny * file,
Expand All @@ -91,17 +91,27 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
read_columns->reserve(options.index_infos->size() + 1);
read_columns->push_back(del_cd);

std::unordered_map<ColId, std::vector<VectorIndexBuilderPtr>> index_builders;
struct IndexToBuild
{
LocalIndexInfo info;
VectorIndexBuilderPtr builder_vector;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;

std::unordered_map<ColId, std::vector<LocalIndexInfo>> col_indexes;
for (const auto & index_info : *options.index_infos)
{
if (index_info.type != IndexType::Vector)
// Just skip if the index is already built
if (file->hasIndex(index_info.index_id))
continue;
col_indexes[index_info.column_id].emplace_back(index_info);
RUNTIME_CHECK(index_info.def_vector_index != nullptr);
index_builders[index_info.column_id].emplace_back(IndexToBuild{
.info = index_info,
.builder_vector = {},
});
}

for (const auto & [col_id, index_infos] : col_indexes)
for (auto & [col_id, indexes] : index_builders)
{
// Make sure the column_id is in the schema.
const auto cd_iter = std::find_if( //
Expand All @@ -114,14 +124,17 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
col_id,
file->getDataPageId());

for (const auto & idx_info : index_infos)
for (auto & index : indexes)
{
// Just skip if the index is already built
if (file->hasIndex(idx_info.index_id))
continue;

index_builders[col_id].emplace_back(
VectorIndexBuilder::create(idx_info.index_id, idx_info.index_definition));
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexBuilder::create(index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
read_columns->push_back(*cd_iter);
}
Expand Down Expand Up @@ -156,9 +169,18 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
const auto & col_with_type_and_name = block.safeGetByPosition(col_idx);
RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns->at(col_idx).id);
const auto & col = col_with_type_and_name.column;
for (const auto & index_builder : index_builders[read_columns->at(col_idx).id])
for (const auto & index : index_builders[read_columns->at(col_idx).id])
{
index_builder->addBlock(*col, del_mark, should_proceed);
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
RUNTIME_CHECK(index.builder_vector);
index.builder_vector->addBlock(*col, del_mark, should_proceed);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
}
}
Expand All @@ -168,26 +190,42 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
for (size_t col_idx = 1; col_idx < num_cols; ++col_idx)
{
const auto & cd = read_columns->at(col_idx);
for (const auto & index_builder : index_builders[cd.id])
for (const auto & index : index_builders[cd.id])
{
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index_builder->saveToBuffer(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});

dtpb::VectorIndexFileProps vector_index;
vector_index.set_index_id(index_builder->index_id);
vector_index.set_index_bytes(data_size);
vector_index.set_index_kind(tipb::VectorIndexKind_Name(index_builder->definition->kind));
vector_index.set_distance_metric(
tipb::VectorDistanceMetric_Name(index_builder->definition->distance_metric));
vector_index.set_dimensions(index_builder->definition->dimension);
index_infos->emplace_back(index_page_id, vector_index);
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
{
RUNTIME_CHECK(index.builder_vector);
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index.builder_vector->saveToBuffer(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});

auto idx_info = dtpb::ColumnFileIndexInfo{};
idx_info.set_index_page_id(index_page_id);
auto * idx_props = idx_info.mutable_index_props();
idx_props->set_kind(dtpb::IndexFileKind::VECTOR_INDEX);
idx_props->set_index_id(index.info.index_id);
idx_props->set_file_size(data_size);
auto * vector_index = idx_props->mutable_vector_index();
vector_index->set_format_version(0);
vector_index->set_dimensions(index.info.def_vector_index->dimension);
vector_index->set_distance_metric(
tipb::VectorDistanceMetric_Name(index.info.def_vector_index->distance_metric));
index_infos->emplace_back(std::move(idx_info));

break;
}
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
}

Expand All @@ -200,7 +238,7 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
return file->cloneWith(file->getDataPageId(), index_infos);
}

ColumnFileTinys ColumnFileTinyVectorIndexWriter::build(ProceedCheckFn should_proceed) const
ColumnFileTinys ColumnFileTinyLocalIndexWriter::build(ProceedCheckFn should_proceed) const
{
ColumnFileTinys new_files;
new_files.reserve(options.files.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace DB::DM

using ColumnFileTinys = std::vector<ColumnFileTinyPtr>;

// ColumnFileTinyVectorIndexWriter write vector index store in PageStorage for ColumnFileTiny.
class ColumnFileTinyVectorIndexWriter
// ColumnFileTinyLocalIndexWriter write local index store in PageStorage for ColumnFileTiny.
class ColumnFileTinyLocalIndexWriter
{
public:
struct LocalIndexBuildInfo
Expand All @@ -54,7 +54,7 @@ class ColumnFileTinyVectorIndexWriter
WriteBatches & wbs; // Write index and modify meta in the same batch.
};

explicit ColumnFileTinyVectorIndexWriter(const Options & options)
explicit ColumnFileTinyLocalIndexWriter(const Options & options)
: logger(Logger::get())
, options(options)
{}
Expand Down
Loading

0 comments on commit a381c31

Please sign in to comment.