Skip to content

Commit

Permalink
storage: refine DeltaMergeStore (#9380)
Browse files Browse the repository at this point in the history
ref #6233

storage: refine DeltaMergeStore

Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
Lloyd-Pottiger and ti-chi-bot[bot] committed Aug 29, 2024
1 parent 637501a commit 4cc2c02
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 81 deletions.
2 changes: 1 addition & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctions
add_library(tiflash_common_io ${SPLIT_SHARED} ${tiflash_common_io_headers} ${tiflash_common_io_sources})
target_link_libraries(tiflash_common_io PUBLIC process_metrics)
target_include_directories(tiflash_common_io BEFORE PUBLIC ${XXHASH_INCLUDE_DIR} ${TiFlash_SOURCE_DIR}/libs/libprocess_metrics/include)
target_include_directories(tiflash_common_io PUBLIC ${TiFlash_SOURCE_DIR}/contrib/not_null/include)

if (OS_FREEBSD)
target_compile_definitions (tiflash_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)
Expand All @@ -153,7 +154,6 @@ else ()
install (TARGETS dbms LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT tiflash)
endif ()

target_include_directories(dbms PUBLIC ${TiFlash_SOURCE_DIR}/contrib/not_null/include)
target_include_directories(dbms PUBLIC "${TIFLASH_PROXY_INCLUDE_DIR}")

if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE" OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO" OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL")
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Common/NotNull.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <Common/Exception.h>

#include <memory>
#include <not_null.hpp>

namespace DB
Expand All @@ -30,7 +29,7 @@ using NotNull = cpp::bitwizeshift::not_null<T>;
template <class T>
auto newNotNull(T && ptr)
{
return cpp::bitwizeshift::check_not_null(std::move(ptr));
return cpp::bitwizeshift::check_not_null(std::forward<T>(ptr));
}

template <class T, class... Args>
Expand All @@ -51,4 +50,4 @@ using NotNullShared = cpp::bitwizeshift::not_null<std::shared_ptr<T>>;
template <typename T>
using NotNullUnique = cpp::bitwizeshift::not_null<std::unique_ptr<T>>;

}; // namespace DB
}; // namespace DB
92 changes: 77 additions & 15 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ DeltaMergeStore::DeltaMergeStore(
{
auto task = [this, dm_context, segment_id] {
auto segment = Segment::restoreSegment(log, *dm_context, segment_id);
std::lock_guard lock(read_write_mutex);
segments.emplace(segment->getRowKeyRange().getEnd(), segment);
id_to_segment.emplace(segment_id, segment);
{
std::unique_lock lock(read_write_mutex);
addSegment(lock, segment);
}
};
wait_group->schedule(task);
}
Expand All @@ -299,9 +300,10 @@ DeltaMergeStore::DeltaMergeStore(
while (segment_id != 0)
{
auto segment = Segment::restoreSegment(log, *dm_context, segment_id);
segments.emplace(segment->getRowKeyRange().getEnd(), segment);
id_to_segment.emplace(segment_id, segment);

{
std::unique_lock lock(read_write_mutex);
addSegment(lock, segment);
}
segment_id = segment->nextSegmentId();
}
}
Expand All @@ -318,6 +320,72 @@ DeltaMergeStore::DeltaMergeStore(
LOG_INFO(log, "Restore DeltaMerge Store end, ps_run_mode={}", magic_enum::enum_name(page_storage_run_mode));
}

DeltaMergeStorePtr DeltaMergeStore::create(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name_,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_,
ThreadPool * thread_pool)
{
auto * store = new DeltaMergeStore(
db_context,
data_path_contains_database_name,
db_name_,
table_name_,
keyspace_id_,
physical_table_id_,
has_replica,
columns,
handle,
is_common_handle_,
rowkey_column_size_,
settings_,
thread_pool);
std::shared_ptr<DeltaMergeStore> store_shared_ptr(store);
return store_shared_ptr;
}

std::unique_ptr<DeltaMergeStore> DeltaMergeStore::createUnique(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name_,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_,
ThreadPool * thread_pool)
{
auto * store = new DeltaMergeStore(
db_context,
data_path_contains_database_name,
db_name_,
table_name_,
keyspace_id_,
physical_table_id_,
has_replica,
columns,
handle,
is_common_handle_,
rowkey_column_size_,
settings_,
thread_pool);
std::unique_ptr<DeltaMergeStore> store_unique_ptr(store);
return store_unique_ptr;
}

DeltaMergeStore::~DeltaMergeStore()
{
LOG_INFO(log, "Release DeltaMerge Store start");
Expand Down Expand Up @@ -389,16 +457,11 @@ void DeltaMergeStore::dropAllSegments(bool keep_first_segment)
// The order to drop the meta and data of this segment doesn't matter,
// Because there is no segment pointing to this segment,
// so it won't be restored again even the drop process was interrupted by restart
segments.erase(segment_to_drop->getRowKeyRange().getEnd());
id_to_segment.erase(segment_id_to_drop);
removeSegment(lock, segment_to_drop);
if (previous_segment)
{
assert(new_previous_segment);
assert(previous_segment->segmentId() == new_previous_segment->segmentId());
segments.erase(previous_segment->getRowKeyRange().getEnd());
segments.emplace(new_previous_segment->getRowKeyRange().getEnd(), new_previous_segment);
id_to_segment.erase(previous_segment->segmentId());
id_to_segment.emplace(new_previous_segment->segmentId(), new_previous_segment);
replaceSegment(lock, previous_segment, new_previous_segment);
}
auto drop_lock = segment_to_drop->mustGetUpdateLock();
segment_to_drop->abandon(*dm_context);
Expand Down Expand Up @@ -2215,8 +2278,7 @@ void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context)
RowKeyRange::newAll(is_common_handle, rowkey_column_size),
segment_id,
0);
segments.emplace(first_segment->getRowKeyRange().getEnd(), first_segment);
id_to_segment.emplace(segment_id, first_segment);
addSegment(lock, first_segment);
}

} // namespace DM
Expand Down
60 changes: 59 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ struct StoreStats
UInt64 background_tasks_length = 0;
};

class DeltaMergeStore;
using DeltaMergeStorePtr = std::shared_ptr<DeltaMergeStore>;

class DeltaMergeStore : private boost::noncopyable
{
public:
Expand Down Expand Up @@ -260,8 +263,27 @@ class DeltaMergeStore : private boost::noncopyable
BackgroundTask nextTask(bool is_heavy, const LoggerPtr & log_);
};

private:
// Let the constructor be private, so that we can control the creation of DeltaMergeStore.
// Please use DeltaMergeStore::create to create a DeltaMergeStore
DeltaMergeStore(
Context & db_context, //
Context & db_context,
bool data_path_contains_database_name,
const String & db_name,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

public:
static DeltaMergeStorePtr create(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name,
const String & table_name_,
Expand All @@ -274,6 +296,22 @@ class DeltaMergeStore : private boost::noncopyable
size_t rowkey_column_size_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

static std::unique_ptr<DeltaMergeStore> createUnique(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

~DeltaMergeStore();

void setUpBackgroundTask(const DMContextPtr & dm_context);
Expand Down Expand Up @@ -765,6 +803,26 @@ class DeltaMergeStore : private boost::noncopyable
bool try_split_task = true);

private:
/**
* Remove the segment from the store's memory structure.
* Not protected by lock, should accquire lock before calling this function.
*/
void removeSegment(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment);
/**
* Add the segment to the store's memory structure.
* Not protected by lock, should accquire lock before calling this function.
*/
void addSegment(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment);
/**
* Replace the old segment with the new segment in the store's memory structure.
* New segment should have the same segment id as the old segment.
* Not protected by lock, should accquire lock before calling this function.
*/
void replaceSegment(
std::unique_lock<std::shared_mutex> &,
const SegmentPtr & old_segment,
const SegmentPtr & new_segment);

/**
* Try to update the segment. "Update" means splitting the segment into two, merging two segments, merging the delta, etc.
* If an update is really performed, the segment will be abandoned (with `segment->hasAbandoned() == true`).
Expand Down
72 changes: 44 additions & 28 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,41 @@ extern const Metric DT_SnapshotOfDeltaMerge;
extern const Metric DT_SnapshotOfSegmentIngest;
} // namespace CurrentMetrics

namespace DB
namespace DB::DM
{
namespace DM

void DeltaMergeStore::removeSegment(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
segments.erase(segment->getRowKeyRange().getEnd());
id_to_segment.erase(segment->segmentId());
}

void DeltaMergeStore::addSegment(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
RUNTIME_CHECK_MSG(
!segments.contains(segment->getRowKeyRange().getEnd()),
"Trying to add segment {} but there is a segment with the same key exists. Old segment must be removed "
"before adding new.",
segment->simpleInfo());
segments[segment->getRowKeyRange().getEnd()] = segment;
id_to_segment[segment->segmentId()] = segment;
}

void DeltaMergeStore::replaceSegment(
std::unique_lock<std::shared_mutex> &,
const SegmentPtr & old_segment,
const SegmentPtr & new_segment)
{
RUNTIME_CHECK(
old_segment->segmentId() == new_segment->segmentId(),
old_segment->segmentId(),
new_segment->segmentId());
segments.erase(old_segment->getRowKeyRange().getEnd());

segments[new_segment->getRowKeyRange().getEnd()] = new_segment;
id_to_segment[new_segment->segmentId()] = new_segment;
}

SegmentPair DeltaMergeStore::segmentSplit(
DMContext & dm_context,
const SegmentPtr & segment,
Expand Down Expand Up @@ -180,14 +211,9 @@ SegmentPair DeltaMergeStore::segmentSplit(
wbs.writeMeta();

segment->abandon(dm_context);
segments.erase(range.getEnd());
id_to_segment.erase(segment->segmentId());

segments[new_left->getRowKeyRange().getEnd()] = new_left;
segments[new_right->getRowKeyRange().getEnd()] = new_right;

id_to_segment.emplace(new_left->segmentId(), new_left);
id_to_segment.emplace(new_right->segmentId(), new_right);
removeSegment(lock, segment);
addSegment(lock, new_left);
addSegment(lock, new_right);

if constexpr (DM_RUN_CHECK)
{
Expand Down Expand Up @@ -338,12 +364,10 @@ SegmentPtr DeltaMergeStore::segmentMerge(
for (const auto & seg : ordered_segments)
{
seg->abandon(dm_context);
segments.erase(seg->getRowKeyRange().getEnd());
id_to_segment.erase(seg->segmentId());
removeSegment(lock, seg);
}

segments.emplace(merged->getRowKeyRange().getEnd(), merged);
id_to_segment.emplace(merged->segmentId(), merged);
addSegment(lock, merged);

if constexpr (DM_RUN_CHECK)
merged->check(dm_context, "After segment merge");
Expand Down Expand Up @@ -474,9 +498,9 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

SegmentPtr new_segment;
{
std::unique_lock read_write_lock(read_write_mutex);
std::unique_lock lock(read_write_mutex);

if (!isSegmentValid(read_write_lock, segment))
if (!isSegmentValid(lock, segment))
{
LOG_DEBUG(
log,
Expand All @@ -495,11 +519,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

// The instance of PKRange::End is closely linked to instance of PKRange. So we cannot reuse it.
// Replace must be done by erase + insert.
segments.erase(segment->getRowKeyRange().getEnd());
id_to_segment.erase(segment->segmentId());

segments[new_segment->getRowKeyRange().getEnd()] = new_segment;
id_to_segment[new_segment->segmentId()] = new_segment;
replaceSegment(lock, segment, new_segment);

segment->abandon(dm_context);

Expand Down Expand Up @@ -604,8 +624,7 @@ SegmentPtr DeltaMergeStore::segmentIngestData(
RUNTIME_CHECK(segment->segmentId() == new_segment->segmentId(), segment->info(), new_segment->info());

segment->abandon(dm_context);
segments[segment->getRowKeyRange().getEnd()] = new_segment;
id_to_segment[segment->segmentId()] = new_segment;
replaceSegment(lock, segment, new_segment);

LOG_INFO(
log,
Expand Down Expand Up @@ -682,8 +701,7 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceDataFromCheckpoint(
wbs.writeMeta();

segment->abandon(dm_context);
segments[segment->getRowKeyRange().getEnd()] = new_segment;
id_to_segment[segment->segmentId()] = new_segment;
replaceSegment(lock, segment, new_segment);

LOG_INFO(log, "ReplaceData - Finish, old_segment={} new_segment={}", segment->info(), new_segment->info());
}
Expand Down Expand Up @@ -729,6 +747,4 @@ bool DeltaMergeStore::doIsSegmentValid(const SegmentPtr & segment)
return true;
}

} // namespace DM

} // namespace DB
} // namespace DB::DM
Loading

0 comments on commit 4cc2c02

Please sign in to comment.