From 2a39f10053fb22f68579cc87852d3add18c50804 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Wed, 28 Apr 2021 11:37:42 +0800 Subject: [PATCH 01/15] Fix calculate stable property for old format DMFile and merged segment --- dbms/src/Storages/DeltaMerge/Segment.cpp | 5 ++++- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 3 +-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index ab122cb6f1c..5c71796b394 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -1085,7 +1085,10 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, auto left_stream = getStream(left, left_snap); auto right_stream = getStream(right, right_snap); - auto merged_stream = std::make_shared(BlockInputStreams{left_stream, right_stream}); + BlockInputStreamPtr merged_stream = std::make_shared(BlockInputStreams{left_stream, right_stream}); + // for the purpose to calculate StableProperty of the new segment + merged_stream = std::make_shared>( + merged_stream, *schema_snap, dm_context.min_version, dm_context.is_common_handle); auto merged_stable_id = left->stable->getId(); auto merged_stable = createNewStable(dm_context, schema_snap, merged_stream, merged_stable_id, wbs, need_rate_limit); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index cdc31252421..311b0c53e33 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -198,7 +198,6 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, IdSetPtr{}, UINT64_MAX, // because we just read one pack at a time true); - data_stream = std::make_shared>(data_stream, rowkey_range, 0); auto mvcc_stream = std::make_shared>( data_stream, read_columns, 0, is_common_handle); mvcc_stream->readPrefix(); @@ -220,7 +219,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, if (unlikely(pack_stats.size() != (size_t)new_pack_properties.property_size())) { throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) - + " doesn't contain all info for packs", + + " doesn't match pack_stat size " + std::to_string(pack_stats.size()), ErrorCodes::LOGICAL_ERROR); } } From e3d5930822891b695f65ea2b22f064f2b6d4e639 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Wed, 28 Apr 2021 15:57:56 +0800 Subject: [PATCH 02/15] small fix --- dbms/src/Storages/GCManager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp index df28820cd5e..1737244cf31 100644 --- a/dbms/src/Storages/GCManager.cpp +++ b/dbms/src/Storages/GCManager.cpp @@ -39,6 +39,7 @@ bool GCManager::work() iter = storages.begin(); checked_storage_num++; auto storage = iter->second.lock(); + iter++; // The storage has been free or dropped. if (!storage || storage->is_dropped) continue; @@ -58,7 +59,6 @@ bool GCManager::work() { tryLogCurrentException(__PRETTY_FUNCTION__); } - iter++; } if (iter == storages.end()) iter = storages.begin(); From a99248e1f1b0da858b4743806a27d6c08969381d Mon Sep 17 00:00:00 2001 From: linkmyth Date: Wed, 28 Apr 2021 17:09:13 +0800 Subject: [PATCH 03/15] Modify the way to calculate StableProperty --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 41 +++++++++++++------ .../Storages/DeltaMerge/StableValueSpace.h | 3 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 4 +- 4 files changed, 32 insertions(+), 18 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 28900198bdc..8156029f09b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1397,7 +1397,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) dm_context->min_version = gc_safe_point; // calculate StableProperty if needed if (!segment->getStable()->isStablePropertyCached()) - segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle(), rowkey_column_size); + segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); try { diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 311b0c53e33..067a3a08036 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -151,10 +151,7 @@ void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const } } -void StableValueSpace::calculateStableProperty(const DMContext & context, - const RowKeyRange & rowkey_range, - bool is_common_handle, - size_t rowkey_column_size) +void StableValueSpace::calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle) { property.gc_hint_version = std::numeric_limits::max(); property.num_versions = 0; @@ -170,7 +167,8 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, // but to keep dmfile immutable, we just cache the result in memory. // // `new_pack_properties` is the temporary container for the calculation result of this StableValueSpace's pack property. - // Because the underlying DTFile may be shared by other Segment, so we cannot modify `pack_properties` directly. + // Note that `pack_stats` stores the stat of the whole underlying DTFile, + // and this Segment may share this DTFile with other Segment. So `pack_stats` may be larger than `new_pack_properties`. DMFile::PackProperties new_pack_properties; if (pack_properties.property_size() == 0) { @@ -192,12 +190,13 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, context.hash_salt, file, read_columns, - RowKeyRange::newAll(is_common_handle, rowkey_column_size), + rowkey_range, nullptr, nullptr, IdSetPtr{}, UINT64_MAX, // because we just read one pack at a time true); + data_stream = std::make_shared>(data_stream, rowkey_range, 0); auto mvcc_stream = std::make_shared>( data_stream, read_columns, 0, is_common_handle); mvcc_stream->readPrefix(); @@ -223,25 +222,41 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, ErrorCodes::LOGICAL_ERROR); } } - auto pack_filter = DMFilePackFilter::loadFrom(file, + auto pack_filter = DMFilePackFilter::loadFrom(file, context.db_context.getGlobalContext().getMinMaxIndexCache(), context.hash_salt, rowkey_range, EMPTY_FILTER, {}, context.db_context.getFileProvider()); - auto & use_packs = pack_filter.getUsePacks(); - bool use_new_pack_properties = pack_properties.property_size() == 0; + auto & use_packs = pack_filter.getUsePacks(); + size_t new_pack_properties_index = 0; + bool use_new_pack_properties = pack_properties.property_size() == 0; for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++) { if (!use_packs[pack_id]) continue; property.num_versions += pack_stats[pack_id].rows; property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean; - - auto & pack_property = use_new_pack_properties ? new_pack_properties.property(pack_id) : pack_properties.property(pack_id); - property.num_rows += pack_property.num_rows(); - property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); + if (use_new_pack_properties) + { + if (unlikely((size_t)new_pack_properties.property_size() <= new_pack_properties_index)) + { + throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) + + " doesn't contain all info for packs", + ErrorCodes::LOGICAL_ERROR); + } + auto & pack_property = new_pack_properties.property(new_pack_properties_index); + property.num_rows += pack_property.num_rows(); + property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); + new_pack_properties_index += 1; + } + else + { + auto & pack_property = pack_properties.property(pack_id); + property.num_rows += pack_property.num_rows(); + property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); + } } } is_property_cached.store(true, std::memory_order_release); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index 5d657168c3a..d438be335d8 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -68,8 +68,7 @@ class StableValueSpace : public std::enable_shared_from_this const StableProperty & getStableProperty() const { return property; } - void - calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle, size_t rowkey_column_size); + void calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle); struct Snapshot; using SnapshotPtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index be68c8cccc0..245adb6daab 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1587,7 +1587,7 @@ try ASSERT_EQ(stable->getRows(), num_rows_write); // caculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); - stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1); + stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false); ASSERT_EQ(stable->isStablePropertyCached(), true); auto & property = stable->getStableProperty(); ASSERT_EQ(property.gc_hint_version, UINT64_MAX); @@ -1623,7 +1623,7 @@ try ASSERT_EQ(Poco::File(file_path + "/property").exists(), false); // caculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); - stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1); + stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false); ASSERT_EQ(stable->isStablePropertyCached(), true); auto & property = stable->getStableProperty(); ASSERT_EQ(property.gc_hint_version, UINT64_MAX); From 552885a396cc5397bcec7033586fdef8a3f6ba5e Mon Sep 17 00:00:00 2001 From: linkmyth Date: Wed, 28 Apr 2021 19:38:04 +0800 Subject: [PATCH 04/15] add failpoint for stable calculation test --- dbms/src/Common/FailPoint.cpp | 14 ++++ dbms/src/Storages/DeltaMerge/DeltaMerge.h | 9 +++ .../DeltaMerge/tests/gtest_dm_segment.cpp | 69 +++++++++++++------ 3 files changed, 71 insertions(+), 21 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 51661bd7c6a..63cdd82eef2 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -39,6 +39,8 @@ std::unordered_map> FailPointHelper::f M(exception_during_mpp_root_task_run) \ M(exception_during_write_to_storage) +#define APPLY_FOR_FAILPOINTS_ALWAYS_FAIL(M) M(force_set_delta_merge_max_block_size) + #define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \ M(pause_after_learner_read) \ M(hang_in_execution) \ @@ -52,6 +54,7 @@ namespace FailPoints { #define M(NAME) extern const char NAME[] = #NAME ""; APPLY_FOR_FAILPOINTS(M) +APPLY_FOR_FAILPOINTS_ALWAYS_FAIL(M) APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) #undef M } // namespace FailPoints @@ -93,6 +96,17 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name) APPLY_FOR_FAILPOINTS(M) #undef M +#define M(NAME) \ + if (fail_point_name == FailPoints::NAME) \ + { \ + /* 0 -- The failpoint will always fail, you must disable the failpoint manually */ \ + fiu_enable(FailPoints::NAME, 1, nullptr, 0); \ + return; \ + } + + APPLY_FOR_FAILPOINTS_ALWAYS_FAIL(M) +#undef M + #define M(NAME) \ if (fail_point_name == FailPoints::NAME) \ { \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index 3ec56f0ac17..0f572c9150c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -13,6 +14,11 @@ namespace DB { +namespace FailPoints +{ +extern const char force_set_delta_merge_max_block_size[]; +} // namespace FailPoints + namespace DM { @@ -98,6 +104,9 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream, Alloc throw Exception("The end of rowkey range should be +Inf in skippable_place mode"); } + // Use failpoint to change the max_block_size for some test cases + fiu_do_on(FailPoints::force_set_delta_merge_max_block_size, { max_block_size = 10; }); + header = stable_input_stream->getHeader(); num_columns = header.columns(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 245adb6daab..a30a50e44c6 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -12,6 +13,11 @@ namespace DB { +namespace FailPoints +{ +extern const char force_set_delta_merge_max_block_size[]; +} // namespace FailPoints + namespace DM { extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // @@ -1077,7 +1083,8 @@ try case Segment_test_Mode::V2_BlockOnly: segment->write(dmContext(), std::move(block)); break; - case Segment_test_Mode::V2_FileOnly: { + case Segment_test_Mode::V2_FileOnly: + { auto delegate = dmContext().path_pool.getStableDiskDelegator(); auto file_provider = dmContext().db_context.getFileProvider(); auto [range, file_ids] = genDMFile(dmContext(), block); @@ -1573,27 +1580,38 @@ CATCH TEST_F(Segment_test, CalculateDTFileProperty) try { - const size_t num_rows_write = 100; - const size_t tso = 10000; - { - Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso); + const size_t num_rows_write_every_round = 100; + const size_t write_round = 3; + const size_t tso = 10000; + // set max_block_size in DeltaMerge to a small value, so the following mergeDelta can produce more than one pack + FailPointHelper::enableFailPoint(FailPoints::force_set_delta_merge_max_block_size); + for (size_t i = 0; i < write_round; i++) + { + size_t start = num_rows_write_every_round * i; + Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso); // write to segment segment->write(dmContext(), block); segment = segment->mergeDelta(dmContext(), tableColumns()); } + FailPointHelper::disableFailPoint(FailPoints::force_set_delta_merge_max_block_size); { auto & stable = segment->getStable(); - ASSERT_EQ(stable->getRows(), num_rows_write); + ASSERT_GT(stable->getDMFiles()[0]->getPacks(), (size_t)1); + ASSERT_EQ(stable->getRows(), num_rows_write_every_round * write_round); // caculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); - stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false); + auto start = RowKeyValue::fromHandle(0); + auto end = RowKeyValue::fromHandle(num_rows_write_every_round); + RowKeyRange range(start, end, false, 1); + // calculate the StableProperty for packs in the key range [0, num_rows_write_every_round) + stable->calculateStableProperty(dmContext(), range, false); ASSERT_EQ(stable->isStablePropertyCached(), true); auto & property = stable->getStableProperty(); ASSERT_EQ(property.gc_hint_version, UINT64_MAX); - ASSERT_EQ(property.num_versions, num_rows_write); - ASSERT_EQ(property.num_puts, num_rows_write); - ASSERT_EQ(property.num_rows, num_rows_write); + ASSERT_EQ(property.num_versions, num_rows_write_every_round); + ASSERT_EQ(property.num_puts, num_rows_write_every_round); + ASSERT_EQ(property.num_rows, num_rows_write_every_round); } } CATCH @@ -1601,20 +1619,25 @@ CATCH TEST_F(Segment_test, CalculateDTFilePropertyWithPropertyFileDeleted) try { - const size_t num_rows_write = 100; - const size_t tso = 10000; - { - Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso); + const size_t num_rows_write_every_round = 100; + const size_t write_round = 3; + const size_t tso = 10000; + // set max_block_size in DeltaMerge to a small value, so the following mergeDelta can produce more than one pack + FailPointHelper::enableFailPoint(FailPoints::force_set_delta_merge_max_block_size); + for (size_t i = 0; i < write_round; i++) + { + size_t start = num_rows_write_every_round * i; + Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso); // write to segment segment->write(dmContext(), block); segment = segment->mergeDelta(dmContext(), tableColumns()); } + FailPointHelper::disableFailPoint(FailPoints::force_set_delta_merge_max_block_size); { - auto & stable = segment->getStable(); - ASSERT_EQ(stable->getRows(), num_rows_write); + auto & stable = segment->getStable(); auto & dmfiles = stable->getDMFiles(); - ASSERT_GT(dmfiles.size(), (size_t)0); + ASSERT_GT(dmfiles[0]->getPacks(), (size_t)1); auto & dmfile = dmfiles[0]; auto file_path = dmfile->path(); // check property file exists and then delete it @@ -1623,13 +1646,17 @@ try ASSERT_EQ(Poco::File(file_path + "/property").exists(), false); // caculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); - stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false); + auto start = RowKeyValue::fromHandle(0); + auto end = RowKeyValue::fromHandle(num_rows_write_every_round); + RowKeyRange range(start, end, false, 1); + // calculate the StableProperty for packs in the key range [0, num_rows_write_every_round) + stable->calculateStableProperty(dmContext(), range, false); ASSERT_EQ(stable->isStablePropertyCached(), true); auto & property = stable->getStableProperty(); ASSERT_EQ(property.gc_hint_version, UINT64_MAX); - ASSERT_EQ(property.num_versions, num_rows_write); - ASSERT_EQ(property.num_puts, num_rows_write); - ASSERT_EQ(property.num_rows, num_rows_write); + ASSERT_EQ(property.num_versions, num_rows_write_every_round); + ASSERT_EQ(property.num_puts, num_rows_write_every_round); + ASSERT_EQ(property.num_rows, num_rows_write_every_round); } } CATCH From 1b99756be10ccd00897b8840286bd40a1dcc8e57 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Wed, 28 Apr 2021 19:40:43 +0800 Subject: [PATCH 05/15] small fix --- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 067a3a08036..30f92bfa597 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -215,12 +215,6 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const pack_property->set_num_rows(cur_effective_num_rows - last_effective_num_rows); } mvcc_stream->readSuffix(); - if (unlikely(pack_stats.size() != (size_t)new_pack_properties.property_size())) - { - throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) - + " doesn't match pack_stat size " + std::to_string(pack_stats.size()), - ErrorCodes::LOGICAL_ERROR); - } } auto pack_filter = DMFilePackFilter::loadFrom(file, context.db_context.getGlobalContext().getMinMaxIndexCache(), From e82b8b81615d4d5158a65bf1fd5d1aa1598d31a9 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Wed, 28 Apr 2021 20:11:16 +0800 Subject: [PATCH 06/15] fix test --- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 2 ++ dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp | 3 +++ 2 files changed, 5 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 30f92bfa597..832303b16c6 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -211,8 +211,10 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const continue; size_t cur_effective_num_rows = mvcc_stream->getEffectiveNumRows(); + size_t gc_hint_version = mvcc_stream->getGCHintVersion(); auto * pack_property = new_pack_properties.add_property(); pack_property->set_num_rows(cur_effective_num_rows - last_effective_num_rows); + pack_property->set_gc_hint_version(gc_hint_version); } mvcc_stream->readSuffix(); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a30a50e44c6..250e6d4859b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1644,6 +1644,9 @@ try ASSERT_EQ(Poco::File(file_path + "/property").exists(), true); Poco::File(file_path + "/property").remove(); ASSERT_EQ(Poco::File(file_path + "/property").exists(), false); + // clear PackProperties to force it to calculate from scratch + dmfile->getPackProperties().clear_property(); + ASSERT_EQ(dmfile->getPackProperties().property_size(), 0); // caculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); auto start = RowKeyValue::fromHandle(0); From 86e44c7ec3d204cd7da40eae5e40346b750f3e6c Mon Sep 17 00:00:00 2001 From: linkmyth Date: Wed, 28 Apr 2021 20:15:46 +0800 Subject: [PATCH 07/15] small fix --- dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 250e6d4859b..c37456e1cb3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1083,8 +1083,7 @@ try case Segment_test_Mode::V2_BlockOnly: segment->write(dmContext(), std::move(block)); break; - case Segment_test_Mode::V2_FileOnly: - { + case Segment_test_Mode::V2_FileOnly: { auto delegate = dmContext().path_pool.getStableDiskDelegator(); auto file_provider = dmContext().db_context.getFileProvider(); auto [range, file_ids] = genDMFile(dmContext(), block); From e3a9af0c3ca38feaf68a0b2ded5682203770ac37 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Thu, 29 Apr 2021 16:58:34 +0800 Subject: [PATCH 08/15] fix segmentation fault --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 8156029f09b..780b10a7335 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1380,9 +1380,10 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) continue; auto & segment_range = segment->getRowKeyRange(); + auto segment_range_debug_string = segment_range.toDebugString(); if (segment->getDelta()->isUpdating()) { - LOG_DEBUG(log, "GC is skipped [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); + LOG_DEBUG(log, "GC is skipped [range=" << segment_range_debug_string << "] [table=" << table_name << "]"); continue; } @@ -1415,15 +1416,15 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) checkSegmentUpdate(dm_context, segment, type); gc_segments_num++; finish_gc_on_segment = true; - LOG_INFO(log, "GC-merge-delta done [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); + LOG_INFO(log, "GC-merge-delta done [range=" << segment_range_debug_string << "] [table=" << table_name << "]"); } } if (!finish_gc_on_segment) - LOG_DEBUG(log, "GC is skipped [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); + LOG_DEBUG(log, "GC is skipped [range=" << segment_range_debug_string << "] [table=" << table_name << "]"); } catch (Exception & e) { - e.addMessage("while apply gc [range=" + segment_range.toDebugString() + "] [table=" + table_name + "]"); + e.addMessage("while apply gc [range=" + segment_range_debug_string + "] [table=" + table_name + "]"); e.rethrow(); } } From 84805060eff8de1618315b28a50b9fe022f3c095 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Thu, 29 Apr 2021 17:26:16 +0800 Subject: [PATCH 09/15] fix comment --- dbms/src/Common/FailPoint.cpp | 14 ------------- dbms/src/Storages/DeltaMerge/DeltaMerge.h | 8 ------- .../DeltaMerge/tests/gtest_dm_segment.cpp | 21 +++++++++---------- 3 files changed, 10 insertions(+), 33 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 63cdd82eef2..51661bd7c6a 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -39,8 +39,6 @@ std::unordered_map> FailPointHelper::f M(exception_during_mpp_root_task_run) \ M(exception_during_write_to_storage) -#define APPLY_FOR_FAILPOINTS_ALWAYS_FAIL(M) M(force_set_delta_merge_max_block_size) - #define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \ M(pause_after_learner_read) \ M(hang_in_execution) \ @@ -54,7 +52,6 @@ namespace FailPoints { #define M(NAME) extern const char NAME[] = #NAME ""; APPLY_FOR_FAILPOINTS(M) -APPLY_FOR_FAILPOINTS_ALWAYS_FAIL(M) APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) #undef M } // namespace FailPoints @@ -96,17 +93,6 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name) APPLY_FOR_FAILPOINTS(M) #undef M -#define M(NAME) \ - if (fail_point_name == FailPoints::NAME) \ - { \ - /* 0 -- The failpoint will always fail, you must disable the failpoint manually */ \ - fiu_enable(FailPoints::NAME, 1, nullptr, 0); \ - return; \ - } - - APPLY_FOR_FAILPOINTS_ALWAYS_FAIL(M) -#undef M - #define M(NAME) \ if (fail_point_name == FailPoints::NAME) \ { \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index 0f572c9150c..e5a939b1f4e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -14,10 +13,6 @@ namespace DB { -namespace FailPoints -{ -extern const char force_set_delta_merge_max_block_size[]; -} // namespace FailPoints namespace DM { @@ -104,9 +99,6 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream, Alloc throw Exception("The end of rowkey range should be +Inf in skippable_place mode"); } - // Use failpoint to change the max_block_size for some test cases - fiu_do_on(FailPoints::force_set_delta_merge_max_block_size, { max_block_size = 10; }); - header = stable_input_stream->getHeader(); num_columns = header.columns(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index c37456e1cb3..d8ef54eb5ce 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -13,10 +12,6 @@ namespace DB { -namespace FailPoints -{ -extern const char force_set_delta_merge_max_block_size[]; -} // namespace FailPoints namespace DM { @@ -1579,11 +1574,14 @@ CATCH TEST_F(Segment_test, CalculateDTFileProperty) try { + Settings settings = dmContext().db_context.getSettings(); + settings.dt_segment_stable_pack_rows = 10; + + segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); + const size_t num_rows_write_every_round = 100; const size_t write_round = 3; const size_t tso = 10000; - // set max_block_size in DeltaMerge to a small value, so the following mergeDelta can produce more than one pack - FailPointHelper::enableFailPoint(FailPoints::force_set_delta_merge_max_block_size); for (size_t i = 0; i < write_round; i++) { size_t start = num_rows_write_every_round * i; @@ -1592,7 +1590,6 @@ try segment->write(dmContext(), block); segment = segment->mergeDelta(dmContext(), tableColumns()); } - FailPointHelper::disableFailPoint(FailPoints::force_set_delta_merge_max_block_size); { auto & stable = segment->getStable(); @@ -1618,11 +1615,14 @@ CATCH TEST_F(Segment_test, CalculateDTFilePropertyWithPropertyFileDeleted) try { + Settings settings = dmContext().db_context.getSettings(); + settings.dt_segment_stable_pack_rows = 10; + + segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); + const size_t num_rows_write_every_round = 100; const size_t write_round = 3; const size_t tso = 10000; - // set max_block_size in DeltaMerge to a small value, so the following mergeDelta can produce more than one pack - FailPointHelper::enableFailPoint(FailPoints::force_set_delta_merge_max_block_size); for (size_t i = 0; i < write_round; i++) { size_t start = num_rows_write_every_round * i; @@ -1631,7 +1631,6 @@ try segment->write(dmContext(), block); segment = segment->mergeDelta(dmContext(), tableColumns()); } - FailPointHelper::disableFailPoint(FailPoints::force_set_delta_merge_max_block_size); { auto & stable = segment->getStable(); From 89f2991ee7a7df660f57bdbe18d415c7e2d73a90 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Thu, 29 Apr 2021 17:27:58 +0800 Subject: [PATCH 10/15] small fix --- dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index d8ef54eb5ce..7df5c873a6f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1618,7 +1618,7 @@ try Settings settings = dmContext().db_context.getSettings(); settings.dt_segment_stable_pack_rows = 10; - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); + segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); const size_t num_rows_write_every_round = 100; const size_t write_round = 3; From 3286262d5a73eb3a0ceccf57f715b9c2ba4d405a Mon Sep 17 00:00:00 2001 From: linkmyth Date: Thu, 29 Apr 2021 17:29:49 +0800 Subject: [PATCH 11/15] small fix --- dbms/src/Storages/DeltaMerge/DeltaMerge.h | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index e5a939b1f4e..3ec56f0ac17 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -13,7 +13,6 @@ namespace DB { - namespace DM { From 733f36516de5904945c3d02deaa18d4355f34127 Mon Sep 17 00:00:00 2001 From: linkmyth Date: Thu, 29 Apr 2021 17:31:12 +0800 Subject: [PATCH 12/15] small fix --- dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 7df5c873a6f..e51789cebd5 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -12,7 +12,6 @@ namespace DB { - namespace DM { extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // From 8e16f644da22f7c60913442e36ea70dc49655bdf Mon Sep 17 00:00:00 2001 From: linkmyth Date: Fri, 30 Apr 2021 10:49:52 +0800 Subject: [PATCH 13/15] fix comment --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 11 +++++----- .../Storages/DeltaMerge/StableValueSpace.cpp | 22 +++++++++++++------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 780b10a7335..741fb891e54 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1379,11 +1379,10 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (segment->getLastCheckGCSafePoint() >= gc_safe_point) continue; - auto & segment_range = segment->getRowKeyRange(); - auto segment_range_debug_string = segment_range.toDebugString(); + RowKeyRange segment_range = segment->getRowKeyRange(); if (segment->getDelta()->isUpdating()) { - LOG_DEBUG(log, "GC is skipped [range=" << segment_range_debug_string << "] [table=" << table_name << "]"); + LOG_DEBUG(log, "GC is skipped [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); continue; } @@ -1416,15 +1415,15 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) checkSegmentUpdate(dm_context, segment, type); gc_segments_num++; finish_gc_on_segment = true; - LOG_INFO(log, "GC-merge-delta done [range=" << segment_range_debug_string << "] [table=" << table_name << "]"); + LOG_INFO(log, "GC-merge-delta done [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); } } if (!finish_gc_on_segment) - LOG_DEBUG(log, "GC is skipped [range=" << segment_range_debug_string << "] [table=" << table_name << "]"); + LOG_DEBUG(log, "GC is skipped [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); } catch (Exception & e) { - e.addMessage("while apply gc [range=" + segment_range_debug_string + "] [table=" + table_name + "]"); + e.addMessage("while apply gc [range=" + segment_range.toDebugString() + "] [table=" + table_name + "]"); e.rethrow(); } } diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 832303b16c6..dce482747b6 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -196,7 +196,6 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const IdSetPtr{}, UINT64_MAX, // because we just read one pack at a time true); - data_stream = std::make_shared>(data_stream, rowkey_range, 0); auto mvcc_stream = std::make_shared>( data_stream, read_columns, 0, is_common_handle); mvcc_stream->readPrefix(); @@ -228,6 +227,21 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const auto & use_packs = pack_filter.getUsePacks(); size_t new_pack_properties_index = 0; bool use_new_pack_properties = pack_properties.property_size() == 0; + if (use_new_pack_properties) + { + size_t use_packs_count = 0; + for (auto pack : use_packs) + { + if (pack) + use_packs_count += 1; + } + if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count)) + { + throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) + + " doesn't match use packs size " + std::to_string(use_packs_count), + ErrorCodes::LOGICAL_ERROR); + } + } for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++) { if (!use_packs[pack_id]) @@ -236,12 +250,6 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean; if (use_new_pack_properties) { - if (unlikely((size_t)new_pack_properties.property_size() <= new_pack_properties_index)) - { - throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) - + " doesn't contain all info for packs", - ErrorCodes::LOGICAL_ERROR); - } auto & pack_property = new_pack_properties.property(new_pack_properties_index); property.num_rows += pack_property.num_rows(); property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); From 7597a3434f323c05e70369ad3c9080052cd26d5d Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Fri, 30 Apr 2021 16:55:34 +0800 Subject: [PATCH 14/15] Update dbms/src/Storages/DeltaMerge/StableValueSpace.cpp Co-authored-by: JaySon --- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index dce482747b6..f2276b320a5 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -230,9 +230,9 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const if (use_new_pack_properties) { size_t use_packs_count = 0; - for (auto pack : use_packs) + for (auto is_used : use_packs) { - if (pack) + if (is_used) use_packs_count += 1; } if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count)) From 09162f45bc3d1a166675e60ca403dc5429190f07 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 30 Apr 2021 21:10:10 +0800 Subject: [PATCH 15/15] Remove conflict Signed-off-by: JaySon-Huang --- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 7f96e6c2a98..f2276b320a5 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -216,12 +216,6 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const pack_property->set_gc_hint_version(gc_hint_version); } mvcc_stream->readSuffix(); - if (unlikely(pack_stats.size() != (size_t)new_pack_properties.property_size())) - { - throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) - + " doesn't contain all info for packs " + std::to_string(pack_stats.size()), - ErrorCodes::LOGICAL_ERROR); - } } auto pack_filter = DMFilePackFilter::loadFrom(file, context.db_context.getGlobalContext().getMinMaxIndexCache(),