Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix calculate stable property #1839

Merged
merged 19 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
if (segment->getLastCheckGCSafePoint() >= gc_safe_point)
continue;

auto & segment_range = segment->getRowKeyRange();
RowKeyRange segment_range = segment->getRowKeyRange();
if (segment->getDelta()->isUpdating())
{
LOG_DEBUG(log, "GC is skipped [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]");
Expand All @@ -1397,7 +1397,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
dm_context->min_version = gc_safe_point;
// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle(), rowkey_column_size);
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

try
{
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,10 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context,
auto left_stream = getStream(left, left_snap);
auto right_stream = getStream(right, right_snap);

auto merged_stream = std::make_shared<ConcatBlockInputStream>(BlockInputStreams{left_stream, right_stream});
BlockInputStreamPtr merged_stream = std::make_shared<ConcatBlockInputStream>(BlockInputStreams{left_stream, right_stream});
// for the purpose to calculate StableProperty of the new segment
merged_stream = std::make_shared<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
merged_stream, *schema_snap, dm_context.min_version, dm_context.is_common_handle);

auto merged_stable_id = left->stable->getId();
auto merged_stable = createNewStable(dm_context, schema_snap, merged_stream, merged_stable_id, wbs, need_rate_limit);
Expand Down
58 changes: 38 additions & 20 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const
}
}

void StableValueSpace::calculateStableProperty(const DMContext & context,
const RowKeyRange & rowkey_range,
bool is_common_handle,
size_t rowkey_column_size)
void StableValueSpace::calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle)
{
property.gc_hint_version = std::numeric_limits<UInt64>::max();
property.num_versions = 0;
Expand All @@ -170,7 +167,8 @@ void StableValueSpace::calculateStableProperty(const DMContext & context,
// but to keep dmfile immutable, we just cache the result in memory.
//
// `new_pack_properties` is the temporary container for the calculation result of this StableValueSpace's pack property.
// Because the underlying DTFile may be shared by other Segment, so we cannot modify `pack_properties` directly.
// Note that `pack_stats` stores the stat of the whole underlying DTFile,
// and this Segment may share this DTFile with other Segment. So `pack_stats` may be larger than `new_pack_properties`.
DMFile::PackProperties new_pack_properties;
if (pack_properties.property_size() == 0)
{
Expand All @@ -192,13 +190,12 @@ void StableValueSpace::calculateStableProperty(const DMContext & context,
context.hash_salt,
file,
read_columns,
RowKeyRange::newAll(is_common_handle, rowkey_column_size),
rowkey_range,
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
nullptr,
nullptr,
IdSetPtr{},
UINT64_MAX, // because we just read one pack at a time
true);
data_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(data_stream, rowkey_range, 0);
auto mvcc_stream = std::make_shared<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
data_stream, read_columns, 0, is_common_handle);
mvcc_stream->readPrefix();
Expand All @@ -213,36 +210,57 @@ void StableValueSpace::calculateStableProperty(const DMContext & context,
continue;

size_t cur_effective_num_rows = mvcc_stream->getEffectiveNumRows();
size_t gc_hint_version = mvcc_stream->getGCHintVersion();
auto * pack_property = new_pack_properties.add_property();
pack_property->set_num_rows(cur_effective_num_rows - last_effective_num_rows);
pack_property->set_gc_hint_version(gc_hint_version);
}
mvcc_stream->readSuffix();
if (unlikely(pack_stats.size() != (size_t)new_pack_properties.property_size()))
{
throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size())
+ " doesn't contain all info for packs",
ErrorCodes::LOGICAL_ERROR);
}
}
auto pack_filter = DMFilePackFilter::loadFrom(file,
auto pack_filter = DMFilePackFilter::loadFrom(file,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
context.hash_salt,
rowkey_range,
EMPTY_FILTER,
{},
context.db_context.getFileProvider());
auto & use_packs = pack_filter.getUsePacks();
bool use_new_pack_properties = pack_properties.property_size() == 0;
auto & use_packs = pack_filter.getUsePacks();
size_t new_pack_properties_index = 0;
bool use_new_pack_properties = pack_properties.property_size() == 0;
if (use_new_pack_properties)
{
size_t use_packs_count = 0;
for (auto pack : use_packs)
{
if (pack)
use_packs_count += 1;
}
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count))
{
throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size())
+ " doesn't match use packs size " + std::to_string(use_packs_count),
ErrorCodes::LOGICAL_ERROR);
}
}
for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++)
{
if (!use_packs[pack_id])
continue;
property.num_versions += pack_stats[pack_id].rows;
property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean;

auto & pack_property = use_new_pack_properties ? new_pack_properties.property(pack_id) : pack_properties.property(pack_id);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
if (use_new_pack_properties)
{
auto & pack_property = new_pack_properties.property(new_pack_properties_index);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
new_pack_properties_index += 1;
}
else
{
auto & pack_property = pack_properties.property(pack_id);
property.num_rows += pack_property.num_rows();
property.gc_hint_version = std::min(property.gc_hint_version, pack_property.gc_hint_version());
}
}
}
is_property_cached.store(true, std::memory_order_release);
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>

const StableProperty & getStableProperty() const { return property; }

void
calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle, size_t rowkey_column_size);
void calculateStableProperty(const DMContext & context, const RowKeyRange & rowkey_range, bool is_common_handle);

struct Snapshot;
using SnapshotPtr = std::shared_ptr<Snapshot>;
Expand Down
63 changes: 45 additions & 18 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1573,63 +1573,90 @@ CATCH
TEST_F(Segment_test, CalculateDTFileProperty)
try
{
const size_t num_rows_write = 100;
const size_t tso = 10000;
Settings settings = dmContext().db_context.getSettings();
settings.dt_segment_stable_pack_rows = 10;

segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings));

const size_t num_rows_write_every_round = 100;
const size_t write_round = 3;
const size_t tso = 10000;
for (size_t i = 0; i < write_round; i++)
{
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso);
size_t start = num_rows_write_every_round * i;
Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso);
// write to segment
segment->write(dmContext(), block);
segment = segment->mergeDelta(dmContext(), tableColumns());
}

{
auto & stable = segment->getStable();
ASSERT_EQ(stable->getRows(), num_rows_write);
ASSERT_GT(stable->getDMFiles()[0]->getPacks(), (size_t)1);
ASSERT_EQ(stable->getRows(), num_rows_write_every_round * write_round);
// caculate StableProperty
ASSERT_EQ(stable->isStablePropertyCached(), false);
stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1);
auto start = RowKeyValue::fromHandle(0);
auto end = RowKeyValue::fromHandle(num_rows_write_every_round);
RowKeyRange range(start, end, false, 1);
// calculate the StableProperty for packs in the key range [0, num_rows_write_every_round)
stable->calculateStableProperty(dmContext(), range, false);
ASSERT_EQ(stable->isStablePropertyCached(), true);
auto & property = stable->getStableProperty();
ASSERT_EQ(property.gc_hint_version, UINT64_MAX);
ASSERT_EQ(property.num_versions, num_rows_write);
ASSERT_EQ(property.num_puts, num_rows_write);
ASSERT_EQ(property.num_rows, num_rows_write);
ASSERT_EQ(property.num_versions, num_rows_write_every_round);
ASSERT_EQ(property.num_puts, num_rows_write_every_round);
ASSERT_EQ(property.num_rows, num_rows_write_every_round);
}
}
CATCH

TEST_F(Segment_test, CalculateDTFilePropertyWithPropertyFileDeleted)
try
{
const size_t num_rows_write = 100;
const size_t tso = 10000;
Settings settings = dmContext().db_context.getSettings();
settings.dt_segment_stable_pack_rows = 10;

segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings));

const size_t num_rows_write_every_round = 100;
const size_t write_round = 3;
const size_t tso = 10000;
for (size_t i = 0; i < write_round; i++)
{
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false, tso);
size_t start = num_rows_write_every_round * i;
Block block = DMTestEnv::prepareSimpleWriteBlock(start, start + num_rows_write_every_round, false, tso);
// write to segment
segment->write(dmContext(), block);
segment = segment->mergeDelta(dmContext(), tableColumns());
}

{
auto & stable = segment->getStable();
ASSERT_EQ(stable->getRows(), num_rows_write);
auto & stable = segment->getStable();
auto & dmfiles = stable->getDMFiles();
ASSERT_GT(dmfiles.size(), (size_t)0);
ASSERT_GT(dmfiles[0]->getPacks(), (size_t)1);
auto & dmfile = dmfiles[0];
auto file_path = dmfile->path();
// check property file exists and then delete it
ASSERT_EQ(Poco::File(file_path + "/property").exists(), true);
Poco::File(file_path + "/property").remove();
ASSERT_EQ(Poco::File(file_path + "/property").exists(), false);
// clear PackProperties to force it to calculate from scratch
dmfile->getPackProperties().clear_property();
ASSERT_EQ(dmfile->getPackProperties().property_size(), 0);
// caculate StableProperty
ASSERT_EQ(stable->isStablePropertyCached(), false);
stable->calculateStableProperty(dmContext(), segment->getRowKeyRange(), false, 1);
auto start = RowKeyValue::fromHandle(0);
auto end = RowKeyValue::fromHandle(num_rows_write_every_round);
RowKeyRange range(start, end, false, 1);
// calculate the StableProperty for packs in the key range [0, num_rows_write_every_round)
stable->calculateStableProperty(dmContext(), range, false);
ASSERT_EQ(stable->isStablePropertyCached(), true);
auto & property = stable->getStableProperty();
ASSERT_EQ(property.gc_hint_version, UINT64_MAX);
ASSERT_EQ(property.num_versions, num_rows_write);
ASSERT_EQ(property.num_puts, num_rows_write);
ASSERT_EQ(property.num_rows, num_rows_write);
ASSERT_EQ(property.num_versions, num_rows_write_every_round);
ASSERT_EQ(property.num_puts, num_rows_write_every_round);
ASSERT_EQ(property.num_rows, num_rows_write_every_round);
}
}
CATCH
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ bool GCManager::work()
iter = storages.begin();
checked_storage_num++;
auto storage = iter->second.lock();
iter++;
// The storage has been free or dropped.
if (!storage || storage->is_dropped)
continue;
Expand All @@ -58,7 +59,6 @@ bool GCManager::work()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
iter++;
}
if (iter == storages.end())
iter = storages.begin();
Expand Down