Skip to content

Commit

Permalink
storage: Support adding vector index in background (#203)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <breezewish@outlook.com>
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
Co-authored-by: Wish <breezewish@outlook.com>
Co-authored-by: JaySon-Huang <tshent@qq.com>
  • Loading branch information
3 people committed Aug 6, 2024
1 parent 9a9daf8 commit 3e20625
Show file tree
Hide file tree
Showing 46 changed files with 3,464 additions and 237 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
M(DT_SnapshotOfReadRaw) \
M(DT_SnapshotOfSegmentSplit) \
M(DT_SnapshotOfSegmentMerge) \
M(DT_SnapshotOfSegmentIngestIndex) \
M(DT_SnapshotOfSegmentIngest) \
M(DT_SnapshotOfDeltaMerge) \
M(DT_SnapshotOfDeltaCompact) \
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -174,6 +175,7 @@ struct ContextShared
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalPageIdAllocatorPtr global_page_id_allocator;
DM::GlobalStoragePoolPtr global_storage_pool;
DM::LocalIndexerSchedulerPtr global_local_indexer_scheduler;

/// The PS instance available on Write Node.
UniversalPageStorageServicePtr ps_write;
Expand Down Expand Up @@ -1763,6 +1765,27 @@ DM::GlobalPageIdAllocatorPtr Context::getGlobalPageIdAllocator() const
return shared->global_page_id_allocator;
}

bool Context::initializeGlobalLocalIndexerScheduler(size_t pool_size, size_t memory_limit)
{
auto lock = getLock();
if (!shared->global_local_indexer_scheduler)
{
shared->global_local_indexer_scheduler
= std::make_shared<DM::LocalIndexerScheduler>(DM::LocalIndexerScheduler::Options{
.pool_size = pool_size,
.memory_limit = memory_limit,
.auto_start = true,
});
}
return true;
}

DM::LocalIndexerSchedulerPtr Context::getGlobalLocalIndexerScheduler() const
{
auto lock = getLock();
return shared->global_local_indexer_scheduler;
}

bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
{
auto lock = getLock();
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <Interpreters/SharedContexts/Disagg_fwd.h>
#include <Interpreters/TimezoneInfo.h>
#include <Server/ServerInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler_fwd.h>
#include <Storages/KVStore/Types.h>
#include <common/MultiVersion.h>

#include <chrono>
Expand Down Expand Up @@ -457,6 +459,9 @@ class Context
bool initializeGlobalPageIdAllocator();
DM::GlobalPageIdAllocatorPtr getGlobalPageIdAllocator() const;

bool initializeGlobalLocalIndexerScheduler(size_t pool_size, size_t memory_limit);
DM::LocalIndexerSchedulerPtr getGlobalLocalIndexerScheduler() const;

bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

Expand Down
29 changes: 24 additions & 5 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,12 +980,13 @@ int Server::main(const std::vector<std::string> & /*args*/)

if (storage_config.format_version != 0)
{
if (storage_config.s3_config.isS3Enabled() && storage_config.format_version != STORAGE_FORMAT_V100.identifier)
if (storage_config.s3_config.isS3Enabled() && storage_config.format_version != STORAGE_FORMAT_V100.identifier
&& storage_config.format_version != STORAGE_FORMAT_V101.identifier)
{
LOG_WARNING(log, "'storage.format_version' must be set to 100 when S3 is enabled!");
LOG_WARNING(log, "'storage.format_version' must be set to 100 or 101 when S3 is enabled!");
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"'storage.format_version' must be set to 100 when S3 is enabled!");
"'storage.format_version' must be set to 100 or 101 when S3 is enabled!");
}
setStorageFormat(storage_config.format_version);
LOG_INFO(log, "Using format_version={} (explicit storage format detected).", storage_config.format_version);
Expand All @@ -996,8 +997,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
// If the user does not explicitly set format_version in the config file but
// enables S3, then we set up a proper format version to support S3.
setStorageFormat(STORAGE_FORMAT_V100.identifier);
LOG_INFO(log, "Using format_version={} (infer by S3 is enabled).", STORAGE_FORMAT_V100.identifier);
setStorageFormat(STORAGE_FORMAT_V101.identifier);
LOG_INFO(log, "Using format_version={} (infer by S3 is enabled).", STORAGE_FORMAT_V101.identifier);
}
else
{
Expand Down Expand Up @@ -1301,6 +1302,24 @@ int Server::main(const std::vector<std::string> & /*args*/)
settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity),
settings.bytes_that_rss_larger_than_limit);

if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
{
// No need to have local index scheduler.
}
else if (global_context->getSharedContextDisagg()->isDisaggregatedStorageMode())
{
global_context->initializeGlobalLocalIndexerScheduler(
server_info.cpu_info.logical_cores * 8 / 10,
server_info.memory_info.capacity * 6 / 10);
}
else
{
// There could be compute tasks, reserve more memory for computes.
global_context->initializeGlobalLocalIndexerScheduler(
server_info.cpu_info.logical_cores * 4 / 10,
server_info.memory_info.capacity * 4 / 10);
}

/// PageStorage run mode has been determined above
global_context->initializeGlobalPageIdAllocator();
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
Expand Down
109 changes: 94 additions & 15 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot.h>
Expand All @@ -59,6 +60,7 @@
#include <magic_enum.hpp>
#include <memory>


namespace ProfileEvents
{
extern const Event DMWriteBlock;
Expand Down Expand Up @@ -212,6 +214,7 @@ DeltaMergeStore::DeltaMergeStore(
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
: global_context(db_context.getGlobalContext())
Expand All @@ -228,6 +231,7 @@ DeltaMergeStore::DeltaMergeStore(
, background_pool(db_context.getBackgroundPool())
, blockable_background_pool(db_context.getBlockableBackgroundPool())
, next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY)
, local_index_infos(std::move(local_index_infos_))
, log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_)))
{
replica_exist.store(has_replica);
Expand Down Expand Up @@ -276,9 +280,10 @@ DeltaMergeStore::DeltaMergeStore(
{
auto task = [this, dm_context, segment_id] {
auto segment = Segment::restoreSegment(log, *dm_context, segment_id);
std::lock_guard lock(read_write_mutex);
segments.emplace(segment->getRowKeyRange().getEnd(), segment);
id_to_segment.emplace(segment_id, segment);
{
std::unique_lock lock(read_write_mutex);
addSegment(lock, segment);
}
};
wait_group->schedule(task);
}
Expand All @@ -291,9 +296,10 @@ DeltaMergeStore::DeltaMergeStore(
while (segment_id != 0)
{
auto segment = Segment::restoreSegment(log, *dm_context, segment_id);
segments.emplace(segment->getRowKeyRange().getEnd(), segment);
id_to_segment.emplace(segment_id, segment);

{
std::unique_lock lock(read_write_mutex);
addSegment(lock, segment);
}
segment_id = segment->nextSegmentId();
}
}
Expand All @@ -310,6 +316,78 @@ DeltaMergeStore::DeltaMergeStore(
LOG_INFO(log, "Restore DeltaMerge Store end, ps_run_mode={}", magic_enum::enum_name(page_storage_run_mode));
}

DeltaMergeStorePtr DeltaMergeStore::create(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name_,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
{
auto * store = new DeltaMergeStore(
db_context,
data_path_contains_database_name,
db_name_,
table_name_,
keyspace_id_,
physical_table_id_,
has_replica,
columns,
handle,
is_common_handle_,
rowkey_column_size_,
local_index_infos_,
settings_,
thread_pool);
std::shared_ptr<DeltaMergeStore> store_shared_ptr(store);
store_shared_ptr->checkAllSegmentsLocalIndex();
return store_shared_ptr;
}

std::unique_ptr<DeltaMergeStore> DeltaMergeStore::createUnique(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name_,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
{
auto * store = new DeltaMergeStore(
db_context,
data_path_contains_database_name,
db_name_,
table_name_,
keyspace_id_,
physical_table_id_,
has_replica,
columns,
handle,
is_common_handle_,
rowkey_column_size_,
local_index_infos_,
settings_,
thread_pool);
std::unique_ptr<DeltaMergeStore> store_unique_ptr(store);
store_unique_ptr->checkAllSegmentsLocalIndex();
return store_unique_ptr;
}

DeltaMergeStore::~DeltaMergeStore()
{
LOG_INFO(log, "Release DeltaMerge Store start");
Expand Down Expand Up @@ -381,16 +459,11 @@ void DeltaMergeStore::dropAllSegments(bool keep_first_segment)
// The order to drop the meta and data of this segment doesn't matter,
// Because there is no segment pointing to this segment,
// so it won't be restored again even the drop process was interrupted by restart
segments.erase(segment_to_drop->getRowKeyRange().getEnd());
id_to_segment.erase(segment_id_to_drop);
removeSegment(lock, segment_to_drop);
if (previous_segment)
{
assert(new_previous_segment);
assert(previous_segment->segmentId() == new_previous_segment->segmentId());
segments.erase(previous_segment->getRowKeyRange().getEnd());
segments.emplace(new_previous_segment->getRowKeyRange().getEnd(), new_previous_segment);
id_to_segment.erase(previous_segment->segmentId());
id_to_segment.emplace(new_previous_segment->segmentId(), new_previous_segment);
replaceSegment(lock, previous_segment, new_previous_segment);
}
auto drop_lock = segment_to_drop->mustGetUpdateLock();
segment_to_drop->abandon(*dm_context);
Expand Down Expand Up @@ -433,6 +506,11 @@ void DeltaMergeStore::shutdown()
return;

LOG_TRACE(log, "Shutdown DeltaMerge start");

auto indexer_scheulder = global_context.getGlobalLocalIndexerScheduler();
RUNTIME_CHECK(indexer_scheulder != nullptr);
indexer_scheulder->dropTasks(keyspace_id, physical_table_id);

// Must shutdown storage path pool to make sure the DMFile remove callbacks
// won't remove dmfiles unexpectly.
path_pool->shutdown();
Expand Down Expand Up @@ -1924,6 +2002,8 @@ void DeltaMergeStore::applySchemaChanges(TableInfo & table_info)
original_table_columns.swap(new_original_table_columns);
store_columns.swap(new_store_columns);

// TODO(local index): There could be some local indexes added/dropped after DDL

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));
}

Expand Down Expand Up @@ -2170,8 +2250,7 @@ void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context)
RowKeyRange::newAll(is_common_handle, rowkey_column_size),
segment_id,
0);
segments.emplace(first_segment->getRowKeyRange().getEnd(), first_segment);
id_to_segment.emplace(segment_id, first_segment);
addSegment(lock, first_segment);
}

} // namespace DM
Expand Down
Loading

0 comments on commit 3e20625

Please sign in to comment.