From 4cc2c02b91c2338f13e0c4d98224d479008e5dc6 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Thu, 29 Aug 2024 14:57:24 +0800 Subject: [PATCH] storage: refine DeltaMergeStore (#9380) ref pingcap/tiflash#6233 storage: refine DeltaMergeStore Signed-off-by: Lloyd-Pottiger Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- dbms/CMakeLists.txt | 2 +- dbms/src/Common/NotNull.h | 5 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 92 ++++++++++++++++--- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 60 +++++++++++- .../DeltaMergeStore_InternalSegment.cpp | 72 +++++++++------ .../tests/gtest_dm_delta_merge_store.cpp | 37 ++++---- ...est_dm_delta_merge_store_fast_add_peer.cpp | 2 +- .../gtest_dm_delta_merge_store_test_basic.h | 4 +- .../tests/gtest_dm_minmax_index.cpp | 2 +- .../tests/gtest_dm_simple_pk_test_basic.cpp | 2 +- .../DeltaMerge/workload/DTWorkload.cpp | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 2 +- dbms/src/Storages/StorageDeltaMerge.h | 2 - .../tests/gtests_parse_push_down_filter.cpp | 6 +- 14 files changed, 209 insertions(+), 81 deletions(-) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index a7a60040260..4e4aefc4a07 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -138,6 +138,7 @@ list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctions add_library(tiflash_common_io ${SPLIT_SHARED} ${tiflash_common_io_headers} ${tiflash_common_io_sources}) target_link_libraries(tiflash_common_io PUBLIC process_metrics) target_include_directories(tiflash_common_io BEFORE PUBLIC ${XXHASH_INCLUDE_DIR} ${TiFlash_SOURCE_DIR}/libs/libprocess_metrics/include) +target_include_directories(tiflash_common_io PUBLIC ${TiFlash_SOURCE_DIR}/contrib/not_null/include) if (OS_FREEBSD) target_compile_definitions (tiflash_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST) @@ -153,7 +154,6 @@ else () install (TARGETS dbms LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT tiflash) endif () -target_include_directories(dbms PUBLIC ${TiFlash_SOURCE_DIR}/contrib/not_null/include) target_include_directories(dbms PUBLIC "${TIFLASH_PROXY_INCLUDE_DIR}") if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE" OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO" OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL") diff --git a/dbms/src/Common/NotNull.h b/dbms/src/Common/NotNull.h index 0794a5b2855..86e3ad3b751 100644 --- a/dbms/src/Common/NotNull.h +++ b/dbms/src/Common/NotNull.h @@ -16,7 +16,6 @@ #include -#include #include namespace DB @@ -30,7 +29,7 @@ using NotNull = cpp::bitwizeshift::not_null; template auto newNotNull(T && ptr) { - return cpp::bitwizeshift::check_not_null(std::move(ptr)); + return cpp::bitwizeshift::check_not_null(std::forward(ptr)); } template @@ -51,4 +50,4 @@ using NotNullShared = cpp::bitwizeshift::not_null>; template using NotNullUnique = cpp::bitwizeshift::not_null>; -}; // namespace DB \ No newline at end of file +}; // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5e9ed0e1cbc..4431c6797f6 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -284,9 +284,10 @@ DeltaMergeStore::DeltaMergeStore( { auto task = [this, dm_context, segment_id] { auto segment = Segment::restoreSegment(log, *dm_context, segment_id); - std::lock_guard lock(read_write_mutex); - segments.emplace(segment->getRowKeyRange().getEnd(), segment); - id_to_segment.emplace(segment_id, segment); + { + std::unique_lock lock(read_write_mutex); + addSegment(lock, segment); + } }; wait_group->schedule(task); } @@ -299,9 +300,10 @@ DeltaMergeStore::DeltaMergeStore( while (segment_id != 0) { auto segment = Segment::restoreSegment(log, *dm_context, segment_id); - segments.emplace(segment->getRowKeyRange().getEnd(), segment); - id_to_segment.emplace(segment_id, segment); - + { + std::unique_lock lock(read_write_mutex); + addSegment(lock, segment); + } segment_id = segment->nextSegmentId(); } } @@ -318,6 +320,72 @@ DeltaMergeStore::DeltaMergeStore( LOG_INFO(log, "Restore DeltaMerge Store end, ps_run_mode={}", magic_enum::enum_name(page_storage_run_mode)); } +DeltaMergeStorePtr DeltaMergeStore::create( + Context & db_context, + bool data_path_contains_database_name, + const String & db_name_, + const String & table_name_, + KeyspaceID keyspace_id_, + TableID physical_table_id_, + bool has_replica, + const ColumnDefines & columns, + const ColumnDefine & handle, + bool is_common_handle_, + size_t rowkey_column_size_, + const Settings & settings_, + ThreadPool * thread_pool) +{ + auto * store = new DeltaMergeStore( + db_context, + data_path_contains_database_name, + db_name_, + table_name_, + keyspace_id_, + physical_table_id_, + has_replica, + columns, + handle, + is_common_handle_, + rowkey_column_size_, + settings_, + thread_pool); + std::shared_ptr store_shared_ptr(store); + return store_shared_ptr; +} + +std::unique_ptr DeltaMergeStore::createUnique( + Context & db_context, + bool data_path_contains_database_name, + const String & db_name_, + const String & table_name_, + KeyspaceID keyspace_id_, + TableID physical_table_id_, + bool has_replica, + const ColumnDefines & columns, + const ColumnDefine & handle, + bool is_common_handle_, + size_t rowkey_column_size_, + const Settings & settings_, + ThreadPool * thread_pool) +{ + auto * store = new DeltaMergeStore( + db_context, + data_path_contains_database_name, + db_name_, + table_name_, + keyspace_id_, + physical_table_id_, + has_replica, + columns, + handle, + is_common_handle_, + rowkey_column_size_, + settings_, + thread_pool); + std::unique_ptr store_unique_ptr(store); + return store_unique_ptr; +} + DeltaMergeStore::~DeltaMergeStore() { LOG_INFO(log, "Release DeltaMerge Store start"); @@ -389,16 +457,11 @@ void DeltaMergeStore::dropAllSegments(bool keep_first_segment) // The order to drop the meta and data of this segment doesn't matter, // Because there is no segment pointing to this segment, // so it won't be restored again even the drop process was interrupted by restart - segments.erase(segment_to_drop->getRowKeyRange().getEnd()); - id_to_segment.erase(segment_id_to_drop); + removeSegment(lock, segment_to_drop); if (previous_segment) { assert(new_previous_segment); - assert(previous_segment->segmentId() == new_previous_segment->segmentId()); - segments.erase(previous_segment->getRowKeyRange().getEnd()); - segments.emplace(new_previous_segment->getRowKeyRange().getEnd(), new_previous_segment); - id_to_segment.erase(previous_segment->segmentId()); - id_to_segment.emplace(new_previous_segment->segmentId(), new_previous_segment); + replaceSegment(lock, previous_segment, new_previous_segment); } auto drop_lock = segment_to_drop->mustGetUpdateLock(); segment_to_drop->abandon(*dm_context); @@ -2215,8 +2278,7 @@ void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context) RowKeyRange::newAll(is_common_handle, rowkey_column_size), segment_id, 0); - segments.emplace(first_segment->getRowKeyRange().getEnd(), first_segment); - id_to_segment.emplace(segment_id, first_segment); + addSegment(lock, first_segment); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 083890dc794..727e78ab1af 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -173,6 +173,9 @@ struct StoreStats UInt64 background_tasks_length = 0; }; +class DeltaMergeStore; +using DeltaMergeStorePtr = std::shared_ptr; + class DeltaMergeStore : private boost::noncopyable { public: @@ -260,8 +263,27 @@ class DeltaMergeStore : private boost::noncopyable BackgroundTask nextTask(bool is_heavy, const LoggerPtr & log_); }; +private: + // Let the constructor be private, so that we can control the creation of DeltaMergeStore. + // Please use DeltaMergeStore::create to create a DeltaMergeStore DeltaMergeStore( - Context & db_context, // + Context & db_context, + bool data_path_contains_database_name, + const String & db_name, + const String & table_name_, + KeyspaceID keyspace_id_, + TableID physical_table_id_, + bool has_replica, + const ColumnDefines & columns, + const ColumnDefine & handle, + bool is_common_handle_, + size_t rowkey_column_size_, + const Settings & settings_ = EMPTY_SETTINGS, + ThreadPool * thread_pool = nullptr); + +public: + static DeltaMergeStorePtr create( + Context & db_context, bool data_path_contains_database_name, const String & db_name, const String & table_name_, @@ -274,6 +296,22 @@ class DeltaMergeStore : private boost::noncopyable size_t rowkey_column_size_, const Settings & settings_ = EMPTY_SETTINGS, ThreadPool * thread_pool = nullptr); + + static std::unique_ptr createUnique( + Context & db_context, + bool data_path_contains_database_name, + const String & db_name, + const String & table_name_, + KeyspaceID keyspace_id_, + TableID physical_table_id_, + bool has_replica, + const ColumnDefines & columns, + const ColumnDefine & handle, + bool is_common_handle_, + size_t rowkey_column_size_, + const Settings & settings_ = EMPTY_SETTINGS, + ThreadPool * thread_pool = nullptr); + ~DeltaMergeStore(); void setUpBackgroundTask(const DMContextPtr & dm_context); @@ -765,6 +803,26 @@ class DeltaMergeStore : private boost::noncopyable bool try_split_task = true); private: + /** + * Remove the segment from the store's memory structure. + * Not protected by lock, should accquire lock before calling this function. + */ + void removeSegment(std::unique_lock &, const SegmentPtr & segment); + /** + * Add the segment to the store's memory structure. + * Not protected by lock, should accquire lock before calling this function. + */ + void addSegment(std::unique_lock &, const SegmentPtr & segment); + /** + * Replace the old segment with the new segment in the store's memory structure. + * New segment should have the same segment id as the old segment. + * Not protected by lock, should accquire lock before calling this function. + */ + void replaceSegment( + std::unique_lock &, + const SegmentPtr & old_segment, + const SegmentPtr & new_segment); + /** * Try to update the segment. "Update" means splitting the segment into two, merging two segments, merging the delta, etc. * If an update is really performed, the segment will be abandoned (with `segment->hasAbandoned() == true`). diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 7df4fa98f4d..8ddb6108fdc 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -34,10 +34,41 @@ extern const Metric DT_SnapshotOfDeltaMerge; extern const Metric DT_SnapshotOfSegmentIngest; } // namespace CurrentMetrics -namespace DB +namespace DB::DM { -namespace DM + +void DeltaMergeStore::removeSegment(std::unique_lock &, const SegmentPtr & segment) +{ + segments.erase(segment->getRowKeyRange().getEnd()); + id_to_segment.erase(segment->segmentId()); +} + +void DeltaMergeStore::addSegment(std::unique_lock &, const SegmentPtr & segment) { + RUNTIME_CHECK_MSG( + !segments.contains(segment->getRowKeyRange().getEnd()), + "Trying to add segment {} but there is a segment with the same key exists. Old segment must be removed " + "before adding new.", + segment->simpleInfo()); + segments[segment->getRowKeyRange().getEnd()] = segment; + id_to_segment[segment->segmentId()] = segment; +} + +void DeltaMergeStore::replaceSegment( + std::unique_lock &, + const SegmentPtr & old_segment, + const SegmentPtr & new_segment) +{ + RUNTIME_CHECK( + old_segment->segmentId() == new_segment->segmentId(), + old_segment->segmentId(), + new_segment->segmentId()); + segments.erase(old_segment->getRowKeyRange().getEnd()); + + segments[new_segment->getRowKeyRange().getEnd()] = new_segment; + id_to_segment[new_segment->segmentId()] = new_segment; +} + SegmentPair DeltaMergeStore::segmentSplit( DMContext & dm_context, const SegmentPtr & segment, @@ -180,14 +211,9 @@ SegmentPair DeltaMergeStore::segmentSplit( wbs.writeMeta(); segment->abandon(dm_context); - segments.erase(range.getEnd()); - id_to_segment.erase(segment->segmentId()); - - segments[new_left->getRowKeyRange().getEnd()] = new_left; - segments[new_right->getRowKeyRange().getEnd()] = new_right; - - id_to_segment.emplace(new_left->segmentId(), new_left); - id_to_segment.emplace(new_right->segmentId(), new_right); + removeSegment(lock, segment); + addSegment(lock, new_left); + addSegment(lock, new_right); if constexpr (DM_RUN_CHECK) { @@ -338,12 +364,10 @@ SegmentPtr DeltaMergeStore::segmentMerge( for (const auto & seg : ordered_segments) { seg->abandon(dm_context); - segments.erase(seg->getRowKeyRange().getEnd()); - id_to_segment.erase(seg->segmentId()); + removeSegment(lock, seg); } - segments.emplace(merged->getRowKeyRange().getEnd(), merged); - id_to_segment.emplace(merged->segmentId(), merged); + addSegment(lock, merged); if constexpr (DM_RUN_CHECK) merged->check(dm_context, "After segment merge"); @@ -474,9 +498,9 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( SegmentPtr new_segment; { - std::unique_lock read_write_lock(read_write_mutex); + std::unique_lock lock(read_write_mutex); - if (!isSegmentValid(read_write_lock, segment)) + if (!isSegmentValid(lock, segment)) { LOG_DEBUG( log, @@ -495,11 +519,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( // The instance of PKRange::End is closely linked to instance of PKRange. So we cannot reuse it. // Replace must be done by erase + insert. - segments.erase(segment->getRowKeyRange().getEnd()); - id_to_segment.erase(segment->segmentId()); - - segments[new_segment->getRowKeyRange().getEnd()] = new_segment; - id_to_segment[new_segment->segmentId()] = new_segment; + replaceSegment(lock, segment, new_segment); segment->abandon(dm_context); @@ -604,8 +624,7 @@ SegmentPtr DeltaMergeStore::segmentIngestData( RUNTIME_CHECK(segment->segmentId() == new_segment->segmentId(), segment->info(), new_segment->info()); segment->abandon(dm_context); - segments[segment->getRowKeyRange().getEnd()] = new_segment; - id_to_segment[segment->segmentId()] = new_segment; + replaceSegment(lock, segment, new_segment); LOG_INFO( log, @@ -682,8 +701,7 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceDataFromCheckpoint( wbs.writeMeta(); segment->abandon(dm_context); - segments[segment->getRowKeyRange().getEnd()] = new_segment; - id_to_segment[segment->segmentId()] = new_segment; + replaceSegment(lock, segment, new_segment); LOG_INFO(log, "ReplaceData - Finish, old_segment={} new_segment={}", segment->info(), new_segment->info()); } @@ -729,6 +747,4 @@ bool DeltaMergeStore::doIsSegmentValid(const SegmentPtr & segment) return true; } -} // namespace DM - -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 39db4bf7417..4be99198746 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -45,15 +45,13 @@ #include #include -namespace DB -{ -namespace ErrorCodes +namespace DB::ErrorCodes { extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; -} // namespace ErrorCodes +} // namespace DB::ErrorCodes -namespace FailPoints +namespace DB::FailPoints { extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; @@ -63,20 +61,20 @@ extern const char segment_merge_after_ingest_packs[]; extern const char force_set_segment_physical_split[]; extern const char force_set_page_file_write_errno[]; extern const char proactive_flush_force_set_type[]; -} // namespace FailPoints +} // namespace DB::FailPoints -namespace tests +namespace DB::tests { DM::PushDownFilterPtr generatePushDownFilter( Context & ctx, - const String table_info_json, + const String & table_info_json, const String & query, const std::optional & opt_tz = std::nullopt); -} -namespace DM -{ -namespace tests +} // namespace DB::tests + +namespace DB::DM::tests { + String testModeToString(const ::testing::TestParamInfo & info) { const auto mode = info.param; @@ -272,7 +270,7 @@ try { new_cols = DMTestEnv::getDefaultColumns(); ColumnDefine handle_column_define = (*new_cols)[0]; - new_store = std::make_shared( + new_store = DeltaMergeStore::create( *db_context, false, "test", @@ -3341,7 +3339,7 @@ class DeltaMergeStoreMergeDeltaBySegmentTest void setupDMStore() { auto cols = DMTestEnv::getDefaultColumns(pk_type); - store = std::make_shared( + store = DeltaMergeStore::create( *db_context, false, "test", @@ -3779,7 +3777,7 @@ try real_columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -3972,7 +3970,6 @@ try } CATCH - TEST_F(DeltaMergeStoreTest, RSResult) try { @@ -4049,7 +4046,7 @@ try fmt::format("select * from default.t_111 where col_time >= {}", value)); RUNTIME_CHECK(filter->extra_cast != nullptr); RUNTIME_CHECK(filter->rs_operator != nullptr); - auto rs_unsupported = typeid_cast(filter->rs_operator.get()); + const auto * rs_unsupported = typeid_cast(filter->rs_operator.get()); RUNTIME_CHECK(rs_unsupported == nullptr, filter->rs_operator->toDebugString()); RUNTIME_CHECK(filter->before_where != nullptr); LOG_DEBUG( @@ -4172,7 +4169,7 @@ try fmt::format("select * from default.t_111 where col_time >= {}", value)); RUNTIME_CHECK(filter->extra_cast != nullptr); RUNTIME_CHECK(filter->rs_operator != nullptr); - auto rs_unsupported = typeid_cast(filter->rs_operator.get()); + const auto * rs_unsupported = typeid_cast(filter->rs_operator.get()); RUNTIME_CHECK(rs_unsupported == nullptr, filter->rs_operator->toDebugString()); RUNTIME_CHECK(filter->before_where != nullptr); LOG_DEBUG( @@ -4217,6 +4214,4 @@ try } CATCH -} // namespace tests -} // namespace DM -} // namespace DB +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 9c238e814ae..9cd713dbf98 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -161,7 +161,7 @@ class DeltaMergeStoreTestFastAddPeer ColumnDefine handle_column_define = (*cols)[0]; - DeltaMergeStorePtr s = std::make_shared( + DeltaMergeStorePtr s = DeltaMergeStore::create( *db_context, false, "test", diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 704bf05d8a9..d0e966fd646 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -67,7 +67,7 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic ColumnDefine handle_column_define = (*cols)[0]; - DeltaMergeStorePtr s = std::make_shared( + DeltaMergeStorePtr s = DeltaMergeStore::create( *db_context, false, "test", @@ -183,7 +183,7 @@ class DeltaMergeStoreRWTest ColumnDefine handle_column_define = (*cols)[0]; - DeltaMergeStorePtr s = std::make_shared( + DeltaMergeStorePtr s = DeltaMergeStore::create( *db_context, false, "test", diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index 0174c7e4720..323968e8726 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -111,7 +111,7 @@ bool checkMatch( Block block = genBlock(header, block_tuples); // max page id is only updated at restart, so we need recreate page v3 before recreate table - DeltaMergeStorePtr store = std::make_shared( + DeltaMergeStorePtr store = DeltaMergeStore::create( context, false, "test_database", diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp index 74b01176a47..67e1091f0bd 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp @@ -47,7 +47,7 @@ void SimplePKTestBasic::reload() auto cols = DMTestEnv::getDefaultColumns( is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID); - store = std::make_shared( + store = DeltaMergeStore::create( *db_context, false, "test", diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index 14da643e73b..c061b7136d8 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -59,7 +59,7 @@ DTWorkload::DTWorkload( context->initializeGlobalPageIdAllocator(); context->initializeGlobalStoragePoolIfNeed(context->getPathPool()); Stopwatch sw; - store = std::make_unique( + store = DeltaMergeStore::createUnique( *context, true, table_info->db_name, diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index b5d4db665d1..f6591ebccce 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1776,7 +1776,7 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread std::lock_guard lock(store_mutex); if (_store == nullptr) { - _store = std::make_shared( + _store = DeltaMergeStore::create( global_context, data_path_contains_database_name, table_column_info->db_name, diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 1000f68f551..c9f87a9601a 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -47,8 +47,6 @@ namespace DM { struct RowKeyRange; struct RowKeyValue; -class DeltaMergeStore; -using DeltaMergeStorePtr = std::shared_ptr; using RowKeyRanges = std::vector; struct ExternalDTFileInfo; struct GCOptions; diff --git a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp index 90a271015c1..28abc8846d1 100644 --- a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp +++ b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp @@ -54,14 +54,14 @@ class ParsePushDownFilterTest : public ::testing::Test ContextPtr ctx = DB::tests::TiFlashTestEnv::getContext(); TimezoneInfo default_timezone_info = DB::tests::TiFlashTestEnv::getContext()->getTimezoneInfo(); DM::PushDownFilterPtr generatePushDownFilter( - String table_info_json, + const String & table_info_json, const String & query, TimezoneInfo & timezone_info); }; DM::PushDownFilterPtr generatePushDownFilter( Context & ctx, - const String table_info_json, + const String & table_info_json, const String & query, const std::optional & opt_tz = std::nullopt) { @@ -127,7 +127,7 @@ DM::PushDownFilterPtr generatePushDownFilter( } DM::PushDownFilterPtr ParsePushDownFilterTest::generatePushDownFilter( - String table_info_json, + const String & table_info_json, const String & query, TimezoneInfo & timezone_info) {