Skip to content

Commit

Permalink
Revert "Fix calculate stable property (#1839) (#1878)"
Browse files Browse the repository at this point in the history
This reverts commit 6a9eb58.
  • Loading branch information
lidezhu committed May 27, 2021
1 parent 261226f commit 4dab341
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 92 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
dm_context->min_version = gc_safe_point;
// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle(), rowkey_column_size);

try
{
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,10 +1083,7 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context,
auto left_stream = getStream(left, left_snap);
auto right_stream = getStream(right, right_snap);

BlockInputStreamPtr merged_stream = std::make_shared<ConcatBlockInputStream>(BlockInputStreams{left_stream, right_stream});
// for the purpose to calculate StableProperty of the new segment
merged_stream = std::make_shared<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
merged_stream, *schema_snap, dm_context.min_version, dm_context.is_common_handle);
auto merged_stream = std::make_shared<ConcatBlockInputStream>(BlockInputStreams{left_stream, right_stream});

auto merged_stable_id = left->stable->getId();
auto merged_stable = createNewStable(dm_context, schema_snap, merged_stream, merged_stable_id, wbs, need_rate_limit);
Expand Down
59 changes: 19 additions & 40 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const
}
}

void StableValueSpace::calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle)
void StableValueSpace::calculateStableProperty(const DMContext & context,
const RowKeyRange & rowkey_range,
bool is_common_handle,
size_t rowkey_column_size)
{
property.gc_hint_version = std::numeric_limits<UInt64>::max();
property.num_versions = 0;
Expand All @@ -161,15 +164,12 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const
auto & file = files[i];
auto & pack_stats = file->getPackStats();
auto & pack_properties = file->getPackProperties();
if (pack_stats.empty())
continue;
// if PackPropertys of this DMFile is empty, this must be an old format file generated by previous version.
// so we need to create file property for this file.
// but to keep dmfile immutable, we just cache the result in memory.
//
// `new_pack_properties` is the temporary container for the calculation result of this StableValueSpace's pack property.
// Note that `pack_stats` stores the stat of the whole underlying DTFile,
// and this Segment may share this DTFile with other Segment. So `pack_stats` may be larger than `new_pack_properties`.
// Because the underlying DTFile may be shared by other Segment, so we cannot modify `pack_properties` directly.
DMFile::PackProperties new_pack_properties;
if (pack_properties.property_size() == 0)
{
Expand All @@ -190,7 +190,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const
false,
file,
read_columns,
rowkey_range,
RowKeyRange::newAll(is_common_handle, rowkey_column_size),
nullptr,
nullptr,
IdSetPtr{},
Expand All @@ -210,56 +210,35 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const
continue;

size_t cur_effective_num_rows = mvcc_stream->getEffectiveNumRows();
size_t gc_hint_version = mvcc_stream->getGCHintVersion();
auto * pack_property = new_pack_properties.add_property();
pack_property->set_num_rows(cur_effective_num_rows - last_effective_num_rows);
pack_property->set_gc_hint_version(gc_hint_version);
}
mvcc_stream->readSuffix();
if (unlikely(pack_stats.size() != (size_t)new_pack_properties.property_size()))
{
throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size())
+ " doesn't contain all info for packs",
ErrorCodes::LOGICAL_ERROR);
}
}
auto pack_filter = DMFilePackFilter::loadFrom(file,
auto pack_filter = DMFilePackFilter::loadFrom(file,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
rowkey_range,
EMPTY_FILTER,
{},
context.db_context.getFileProvider());
auto & use_packs = pack_filter.getUsePacks();
size_t new_pack_properties_index = 0;
bool use_new_pack_properties = pack_properties.property_size() == 0;
if (use_new_pack_properties)
{
size_t use_packs_count = 0;
for (auto is_used : use_packs)
{
if (is_used)
use_packs_count += 1;
}
if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count))
{
throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size())
+ " doesn't match use packs size " + std::to_string(use_packs_count),
ErrorCodes::LOGICAL_ERROR);
}
}
auto & use_packs = pack_filter.getUsePacks();
bool use_new_pack_properties = pack_properties.property_size() == 0;
for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++)
{
if (!use_packs[pack_id])
continue;
property.num_versions += pack_stats[pack_id].rows;
property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean;
if (use_new_pack_properties)
{
auto & pack_property = new_pack_properties.property(new_pack_properties_index);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
new_pack_properties_index += 1;
}
else
{
auto & pack_property = pack_properties.property(pack_id);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
}

auto & pack_property = use_new_pack_properties ? new_pack_properties.property(pack_id) : pack_properties.property(pack_id);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
}
}
is_property_cached.store(true, std::memory_order_release);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>

const StableProperty & getStableProperty() const { return property; }

void calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle);
void
calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle, size_t rowkey_column_size);

struct Snapshot;
using SnapshotPtr = std::shared_ptr<Snapshot>;
Expand Down
63 changes: 18 additions & 45 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1570,90 +1570,63 @@ CATCH
TEST_F(Segment_test, CalculateDTFileProperty)
try
{
Settings settings = dmContext().db_context.getSettings();
settings.dt_segment_stable_pack_rows = 10;

segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings));

const size_t num_rows_write_every_round = 100;
const size_t write_round = 3;
const size_t tso = 10000;
for (size_t i = 0; i < write_round; i++)
const size_t num_rows_write = 100;
const size_t tso = 10000;
{
size_t start = num_rows_write_every_round * i;
Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso);
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso);
// write to segment
segment->write(dmContext(), block);
segment = segment->mergeDelta(dmContext(), tableColumns());
}

{
auto & stable = segment->getStable();
ASSERT_GT(stable->getDMFiles()[0]->getPacks(), (size_t)1);
ASSERT_EQ(stable->getRows(), num_rows_write_every_round * write_round);
ASSERT_EQ(stable->getRows(), num_rows_write);
// caculate StableProperty
ASSERT_EQ(stable->isStablePropertyCached(), false);
auto start = RowKeyValue::fromHandle(0);
auto end = RowKeyValue::fromHandle(num_rows_write_every_round);
RowKeyRange range(start, end, false, 1);
// calculate the StableProperty for packs in the key range [0, num_rows_write_every_round)
stable->calculateStableProperty(dmContext(), range, false);
stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1);
ASSERT_EQ(stable->isStablePropertyCached(), true);
auto & property = stable->getStableProperty();
ASSERT_EQ(property.gc_hint_version, UINT64_MAX);
ASSERT_EQ(property.num_versions, num_rows_write_every_round);
ASSERT_EQ(property.num_puts, num_rows_write_every_round);
ASSERT_EQ(property.num_rows, num_rows_write_every_round);
ASSERT_EQ(property.num_versions, num_rows_write);
ASSERT_EQ(property.num_puts, num_rows_write);
ASSERT_EQ(property.num_rows, num_rows_write);
}
}
CATCH

TEST_F(Segment_test, CalculateDTFilePropertyWithPropertyFileDeleted)
try
{
Settings settings = dmContext().db_context.getSettings();
settings.dt_segment_stable_pack_rows = 10;

segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings));

const size_t num_rows_write_every_round = 100;
const size_t write_round = 3;
const size_t tso = 10000;
for (size_t i = 0; i < write_round; i++)
const size_t num_rows_write = 100;
const size_t tso = 10000;
{
size_t start = num_rows_write_every_round * i;
Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso);
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso);
// write to segment
segment->write(dmContext(), block);
segment = segment->mergeDelta(dmContext(), tableColumns());
}

{
auto & stable = segment->getStable();
auto & stable = segment->getStable();
ASSERT_EQ(stable->getRows(), num_rows_write);
auto & dmfiles = stable->getDMFiles();
ASSERT_GT(dmfiles[0]->getPacks(), (size_t)1);
ASSERT_GT(dmfiles.size(), (size_t)0);
auto & dmfile = dmfiles[0];
auto file_path = dmfile->path();
// check property file exists and then delete it
ASSERT_EQ(Poco::File(file_path + "/property").exists(), true);
Poco::File(file_path + "/property").remove();
ASSERT_EQ(Poco::File(file_path + "/property").exists(), false);
// clear PackProperties to force it to calculate from scratch
dmfile->getPackProperties().clear_property();
ASSERT_EQ(dmfile->getPackProperties().property_size(), 0);
// caculate StableProperty
ASSERT_EQ(stable->isStablePropertyCached(), false);
auto start = RowKeyValue::fromHandle(0);
auto end = RowKeyValue::fromHandle(num_rows_write_every_round);
RowKeyRange range(start, end, false, 1);
// calculate the StableProperty for packs in the key range [0, num_rows_write_every_round)
stable->calculateStableProperty(dmContext(), range, false);
stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1);
ASSERT_EQ(stable->isStablePropertyCached(), true);
auto & property = stable->getStableProperty();
ASSERT_EQ(property.gc_hint_version, UINT64_MAX);
ASSERT_EQ(property.num_versions, num_rows_write_every_round);
ASSERT_EQ(property.num_puts, num_rows_write_every_round);
ASSERT_EQ(property.num_rows, num_rows_write_every_round);
ASSERT_EQ(property.num_versions, num_rows_write);
ASSERT_EQ(property.num_puts, num_rows_write);
ASSERT_EQ(property.num_rows, num_rows_write);
}
}
CATCH
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ bool GCManager::work()
auto storage = iter->second.lock();
iter++;
// The storage has been free
if (!storage)
if (!storage || storage->is_dropped)
continue;

try
Expand All @@ -71,6 +71,7 @@ bool GCManager::work()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
iter++;
}
if (iter == storages.end())
iter = storages.begin();
Expand Down

0 comments on commit 4dab341

Please sign in to comment.