diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 20598cb4ff2..e4c7b46356b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1446,7 +1446,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) dm_context->min_version = gc_safe_point; // calculate StableProperty if needed if (!segment->getStable()->isStablePropertyCached()) - segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); + segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle(), rowkey_column_size); try { diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 631cda19001..a477af6c886 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -1083,10 +1083,7 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, auto left_stream = getStream(left, left_snap); auto right_stream = getStream(right, right_snap); - BlockInputStreamPtr merged_stream = std::make_shared(BlockInputStreams{left_stream, right_stream}); - // for the purpose to calculate StableProperty of the new segment - merged_stream = std::make_shared>( - merged_stream, *schema_snap, dm_context.min_version, dm_context.is_common_handle); + auto merged_stream = std::make_shared(BlockInputStreams{left_stream, right_stream}); auto merged_stable_id = left->stable->getId(); auto merged_stable = createNewStable(dm_context, schema_snap, merged_stream, merged_stable_id, wbs, need_rate_limit); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index ece6d2e8bb3..c79dbfa6b38 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -150,7 +150,10 @@ void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const } } -void StableValueSpace::calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle) +void StableValueSpace::calculateStableProperty(const DMContext & context, + const RowKeyRange & rowkey_range, + bool is_common_handle, + size_t rowkey_column_size) { property.gc_hint_version = std::numeric_limits::max(); property.num_versions = 0; @@ -161,15 +164,12 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const auto & file = files[i]; auto & pack_stats = file->getPackStats(); auto & pack_properties = file->getPackProperties(); - if (pack_stats.empty()) - continue; // if PackPropertys of this DMFile is empty, this must be an old format file generated by previous version. // so we need to create file property for this file. // but to keep dmfile immutable, we just cache the result in memory. // // `new_pack_properties` is the temporary container for the calculation result of this StableValueSpace's pack property. - // Note that `pack_stats` stores the stat of the whole underlying DTFile, - // and this Segment may share this DTFile with other Segment. So `pack_stats` may be larger than `new_pack_properties`. + // Because the underlying DTFile may be shared by other Segment, so we cannot modify `pack_properties` directly. DMFile::PackProperties new_pack_properties; if (pack_properties.property_size() == 0) { @@ -190,7 +190,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const false, file, read_columns, - rowkey_range, + RowKeyRange::newAll(is_common_handle, rowkey_column_size), nullptr, nullptr, IdSetPtr{}, @@ -210,56 +210,35 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const continue; size_t cur_effective_num_rows = mvcc_stream->getEffectiveNumRows(); - size_t gc_hint_version = mvcc_stream->getGCHintVersion(); auto * pack_property = new_pack_properties.add_property(); pack_property->set_num_rows(cur_effective_num_rows - last_effective_num_rows); - pack_property->set_gc_hint_version(gc_hint_version); } mvcc_stream->readSuffix(); + if (unlikely(pack_stats.size() != (size_t)new_pack_properties.property_size())) + { + throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) + + " doesn't contain all info for packs", + ErrorCodes::LOGICAL_ERROR); + } } - auto pack_filter = DMFilePackFilter::loadFrom(file, + auto pack_filter = DMFilePackFilter::loadFrom(file, context.db_context.getGlobalContext().getMinMaxIndexCache(), rowkey_range, EMPTY_FILTER, {}, context.db_context.getFileProvider()); - auto & use_packs = pack_filter.getUsePacks(); - size_t new_pack_properties_index = 0; - bool use_new_pack_properties = pack_properties.property_size() == 0; - if (use_new_pack_properties) - { - size_t use_packs_count = 0; - for (auto is_used : use_packs) - { - if (is_used) - use_packs_count += 1; - } - if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count)) - { - throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) - + " doesn't match use packs size " + std::to_string(use_packs_count), - ErrorCodes::LOGICAL_ERROR); - } - } + auto & use_packs = pack_filter.getUsePacks(); + bool use_new_pack_properties = pack_properties.property_size() == 0; for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++) { if (!use_packs[pack_id]) continue; property.num_versions += pack_stats[pack_id].rows; property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean; - if (use_new_pack_properties) - { - auto & pack_property = new_pack_properties.property(new_pack_properties_index); - property.num_rows += pack_property.num_rows(); - property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); - new_pack_properties_index += 1; - } - else - { - auto & pack_property = pack_properties.property(pack_id); - property.num_rows += pack_property.num_rows(); - property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); - } + + auto & pack_property = use_new_pack_properties ? new_pack_properties.property(pack_id) : pack_properties.property(pack_id); + property.num_rows += pack_property.num_rows(); + property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version()); } } is_property_cached.store(true, std::memory_order_release); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index d438be335d8..5d657168c3a 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -68,7 +68,8 @@ class StableValueSpace : public std::enable_shared_from_this const StableProperty & getStableProperty() const { return property; } - void calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle); + void + calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle, size_t rowkey_column_size); struct Snapshot; using SnapshotPtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a390f4d4c4d..743e45f4a4a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1570,18 +1570,10 @@ CATCH TEST_F(Segment_test, CalculateDTFileProperty) try { - Settings settings = dmContext().db_context.getSettings(); - settings.dt_segment_stable_pack_rows = 10; - - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); - - const size_t num_rows_write_every_round = 100; - const size_t write_round = 3; - const size_t tso = 10000; - for (size_t i = 0; i < write_round; i++) + const size_t num_rows_write = 100; + const size_t tso = 10000; { - size_t start = num_rows_write_every_round * i; - Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso); + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso); // write to segment segment->write(dmContext(), block); segment = segment->mergeDelta(dmContext(), tableColumns()); @@ -1589,21 +1581,16 @@ try { auto & stable = segment->getStable(); - ASSERT_GT(stable->getDMFiles()[0]->getPacks(), (size_t)1); - ASSERT_EQ(stable->getRows(), num_rows_write_every_round * write_round); + ASSERT_EQ(stable->getRows(), num_rows_write); // caculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); - auto start = RowKeyValue::fromHandle(0); - auto end = RowKeyValue::fromHandle(num_rows_write_every_round); - RowKeyRange range(start, end, false, 1); - // calculate the StableProperty for packs in the key range [0, num_rows_write_every_round) - stable->calculateStableProperty(dmContext(), range, false); + stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1); ASSERT_EQ(stable->isStablePropertyCached(), true); auto & property = stable->getStableProperty(); ASSERT_EQ(property.gc_hint_version, UINT64_MAX); - ASSERT_EQ(property.num_versions, num_rows_write_every_round); - ASSERT_EQ(property.num_puts, num_rows_write_every_round); - ASSERT_EQ(property.num_rows, num_rows_write_every_round); + ASSERT_EQ(property.num_versions, num_rows_write); + ASSERT_EQ(property.num_puts, num_rows_write); + ASSERT_EQ(property.num_rows, num_rows_write); } } CATCH @@ -1611,49 +1598,35 @@ CATCH TEST_F(Segment_test, CalculateDTFilePropertyWithPropertyFileDeleted) try { - Settings settings = dmContext().db_context.getSettings(); - settings.dt_segment_stable_pack_rows = 10; - - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); - - const size_t num_rows_write_every_round = 100; - const size_t write_round = 3; - const size_t tso = 10000; - for (size_t i = 0; i < write_round; i++) + const size_t num_rows_write = 100; + const size_t tso = 10000; { - size_t start = num_rows_write_every_round * i; - Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso); + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso); // write to segment segment->write(dmContext(), block); segment = segment->mergeDelta(dmContext(), tableColumns()); } { - auto & stable = segment->getStable(); + auto & stable = segment->getStable(); + ASSERT_EQ(stable->getRows(), num_rows_write); auto & dmfiles = stable->getDMFiles(); - ASSERT_GT(dmfiles[0]->getPacks(), (size_t)1); + ASSERT_GT(dmfiles.size(), (size_t)0); auto & dmfile = dmfiles[0]; auto file_path = dmfile->path(); // check property file exists and then delete it ASSERT_EQ(Poco::File(file_path + "/property").exists(), true); Poco::File(file_path + "/property").remove(); ASSERT_EQ(Poco::File(file_path + "/property").exists(), false); - // clear PackProperties to force it to calculate from scratch - dmfile->getPackProperties().clear_property(); - ASSERT_EQ(dmfile->getPackProperties().property_size(), 0); // caculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); - auto start = RowKeyValue::fromHandle(0); - auto end = RowKeyValue::fromHandle(num_rows_write_every_round); - RowKeyRange range(start, end, false, 1); - // calculate the StableProperty for packs in the key range [0, num_rows_write_every_round) - stable->calculateStableProperty(dmContext(), range, false); + stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1); ASSERT_EQ(stable->isStablePropertyCached(), true); auto & property = stable->getStableProperty(); ASSERT_EQ(property.gc_hint_version, UINT64_MAX); - ASSERT_EQ(property.num_versions, num_rows_write_every_round); - ASSERT_EQ(property.num_puts, num_rows_write_every_round); - ASSERT_EQ(property.num_rows, num_rows_write_every_round); + ASSERT_EQ(property.num_versions, num_rows_write); + ASSERT_EQ(property.num_puts, num_rows_write); + ASSERT_EQ(property.num_rows, num_rows_write); } } CATCH diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp index 47d9085f7a2..a5b83639f32 100644 --- a/dbms/src/Storages/GCManager.cpp +++ b/dbms/src/Storages/GCManager.cpp @@ -45,7 +45,7 @@ bool GCManager::work() auto storage = iter->second.lock(); iter++; // The storage has been free - if (!storage) + if (!storage || storage->is_dropped) continue; try @@ -71,6 +71,7 @@ bool GCManager::work() { tryLogCurrentException(__PRETTY_FUNCTION__); } + iter++; } if (iter == storages.end()) iter = storages.begin();