Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do background compaction when the ratio of stable data covered by delete range is too large #2416

Merged
merged 32 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
aa3dca4
Add tests to ensure the behavior of DeleteRange
JaySon-Huang Dec 30, 2020
e0d27b0
Cleanup some unreasonable include
JaySon-Huang May 26, 2021
6177426
Use macro to define enums
JaySon-Huang May 26, 2021
9c4f3a6
Merge branch 'master' into compact_by_delete_range
lidezhu Jul 19, 2021
adc1534
small fix
lidezhu Jul 19, 2021
e72f1c0
add more log
lidezhu Jul 20, 2021
32b9ab5
small fix
lidezhu Jul 20, 2021
b5cf9de
fix cannot gc bug
lidezhu Jul 20, 2021
e2c25d0
hack to test
lidezhu Jul 20, 2021
4eb4d00
add more log for test
lidezhu Jul 20, 2021
15a8af0
add log for debug
lidezhu Jul 20, 2021
64e4843
add more log for debug
lidezhu Jul 21, 2021
a2acfb7
try fix not merge
lidezhu Jul 21, 2021
73a51a1
try to merge all emtpy segments
lidezhu Jul 21, 2021
d780216
Merge branch 'master' into compact_by_delete_range
lidezhu Aug 4, 2021
60c6c65
remove extra log
lidezhu Aug 4, 2021
1f3f7e2
small fix
lidezhu Aug 4, 2021
b0498ff
fix conflict
lidezhu Aug 6, 2021
df90c4f
format code
lidezhu Aug 6, 2021
2aef8ba
fix compile
lidezhu Aug 6, 2021
bdc779d
remove macro
lidezhu Aug 12, 2021
1aa065a
remove comment in unit test
lidezhu Aug 12, 2021
cc32f91
small improvement for gtest comment
lidezhu Aug 12, 2021
96ec774
remove extra comment
lidezhu Aug 12, 2021
a826854
Update dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
lidezhu Aug 20, 2021
a5f9081
fix comment
lidezhu Aug 23, 2021
9fcd6f4
Merge branch 'compact_by_delete_range' of github.com:JaySon-Huang/tic…
lidezhu Aug 23, 2021
de06502
Merge branch 'master' into compact_by_delete_range
lidezhu Aug 23, 2021
c676302
format code
lidezhu Aug 23, 2021
0757d30
fix typo
lidezhu Aug 23, 2021
bd457f0
Merge branch 'master' into compact_by_delete_range
lidezhu Aug 23, 2021
4e7a39b
Merge branch 'master' into compact_by_delete_range
lidezhu Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ website/presentations
build_docker
docker/builder/tics
release-centos7/tiflash
release-centos7/tiflash-*
release-centos7/build-release
release-darwin/tiflash
release-darwin/build-release
Expand Down
707 changes: 418 additions & 289 deletions dbms/src/Interpreters/Settings.h

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

RowKeyRange getSquashDeleteRange() const;

const auto & getStorageSnapshot() { return storage_snap; }
const auto & getSharedDeltaIndex() { return shared_delta_index; }
};
Expand Down
103 changes: 56 additions & 47 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ namespace DB::DM

std::pair<size_t, size_t> findPack(const DeltaPacks & packs, size_t rows_offset, size_t deletes_offset)
{
size_t rows_count = 0;
size_t rows_count = 0;
size_t deletes_count = 0;
size_t pack_index = 0;
size_t pack_index = 0;
for (; pack_index < packs.size(); ++pack_index)
{
if (rows_count == rows_offset && deletes_count == deletes_offset)
Expand All @@ -26,9 +26,9 @@ std::pair<size_t, size_t> findPack(const DeltaPacks & packs, size_t rows_offset,
{
if (unlikely(rows_count != rows_offset))
throw Exception("rows_count and rows_offset are expected to be equal. pack_index: " + DB::toString(pack_index)
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));
return {pack_index, 0};
}
++deletes_count;
Expand All @@ -40,18 +40,18 @@ std::pair<size_t, size_t> findPack(const DeltaPacks & packs, size_t rows_offset,
{
if (unlikely(deletes_count != deletes_offset))
throw Exception("deletes_count and deletes_offset are expected to be equal. pack_index: " + DB::toString(pack_index)
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));

return {pack_index, pack->getRows() - (rows_count - rows_offset)};
}
}
}
if (rows_count != rows_offset || deletes_count != deletes_offset)
throw Exception("illegal rows_offset and deletes_offset. pack_size: " + DB::toString(packs.size())
+ ", rows_count: " + DB::toString(rows_count) + ", rows_offset: " + DB::toString(rows_offset)
+ ", deletes_count: " + DB::toString(deletes_count) + ", deletes_offset: " + DB::toString(deletes_offset));
+ ", rows_count: " + DB::toString(rows_count) + ", rows_offset: " + DB::toString(rows_offset)
+ ", deletes_count: " + DB::toString(deletes_count) + ", deletes_offset: " + DB::toString(deletes_offset));

return {pack_index, 0};
}
Expand All @@ -69,13 +69,13 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
if (abandoned.load(std::memory_order_relaxed))
return {};

auto snap = std::make_shared<DeltaValueSnapshot>(type);
snap->is_update = for_update;
snap->_delta = this->shared_from_this();
auto snap = std::make_shared<DeltaValueSnapshot>(type);
snap->is_update = for_update;
snap->_delta = this->shared_from_this();
snap->storage_snap = std::make_shared<StorageSnapshot>(context.storage_pool, context.getReadLimiter(), true);
snap->rows = rows;
snap->bytes = bytes;
snap->deletes = deletes;
snap->rows = rows;
snap->bytes = bytes;
snap->deletes = deletes;
snap->packs.reserve(packs.size());

snap->shared_delta_index = delta_index;
Expand All @@ -87,9 +87,9 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
snap->deletes -= unsaved_deletes;
}

size_t check_rows = 0;
size_t check_rows = 0;
size_t check_deletes = 0;
size_t total_rows = 0;
size_t total_rows = 0;
size_t total_deletes = 0;
for (const auto & pack : packs)
{
Expand Down Expand Up @@ -122,15 +122,24 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
return snap;
}

RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
{
RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size);
for (auto iter = packs.cbegin(); iter != packs.cend(); ++iter)
{
const auto & pack = *iter;
if (auto dp_delete = pack->tryToDeleteRange(); dp_delete)
squashed_delete_range = squashed_delete_range.merge(dp_delete->getDeleteRange());
}
return squashed_delete_range;
}

// ================================================
// DeltaValueReader
// ================================================


DeltaValueReader::DeltaValueReader(const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
DeltaValueReader::DeltaValueReader(
const DMContext & context, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, const RowKeyRange & segment_range_)
: delta_snap(delta_snap_), col_defs(col_defs_), segment_range(segment_range_)
{
size_t total_rows = 0;
Expand All @@ -145,13 +154,13 @@ DeltaValueReader::DeltaValueReader(const DMContext & context,

DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
auto new_reader = new DeltaValueReader();
new_reader->delta_snap = delta_snap;
auto new_reader = new DeltaValueReader();
new_reader->delta_snap = delta_snap;
new_reader->_compacted_delta_index = _compacted_delta_index;
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;
new_reader->pack_rows = pack_rows;
new_reader->pack_rows_end = pack_rows_end;
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;
new_reader->pack_rows = pack_rows;
new_reader->pack_rows_end = pack_rows_end;

for (auto & pr : pack_readers)
new_reader->pack_readers.push_back(pr->createNewReader(new_col_defs));
Expand All @@ -175,18 +184,18 @@ size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, s
auto total_delta_rows = delta_snap->getRows();

auto start = std::min(offset, total_delta_rows);
auto end = std::min(offset + limit, total_delta_rows);
auto end = std::min(offset + limit, total_delta_rows);
if (end == start)
return 0;

auto [start_pack_index, rows_start_in_start_pack] = locatePosByAccumulation(pack_rows_end, start);
auto [end_pack_index, rows_end_in_end_pack] = locatePosByAccumulation(pack_rows_end, end);
auto [end_pack_index, rows_end_in_end_pack] = locatePosByAccumulation(pack_rows_end, end);

size_t actual_read = 0;
for (size_t pack_index = start_pack_index; pack_index <= end_pack_index; ++pack_index)
{
size_t rows_start_in_pack = pack_index == start_pack_index ? rows_start_in_start_pack : 0;
size_t rows_end_in_pack = pack_index == end_pack_index ? rows_end_in_end_pack : pack_rows[pack_index];
size_t rows_end_in_pack = pack_index == end_pack_index ? rows_end_in_end_pack : pack_rows[pack_index];
size_t rows_in_pack_limit = rows_end_in_pack - rows_start_in_pack;

// Nothing to read.
Expand Down Expand Up @@ -224,10 +233,10 @@ BlockOrDeletes DeltaValueReader::getPlaceItems(size_t rows_begin, size_t deletes
auto & packs = delta_snap->getPacks();

auto [start_pack_index, rows_start_in_start_pack] = findPack(packs, rows_begin, deletes_begin);
auto [end_pack_index, rows_end_in_end_pack] = findPack(packs, rows_end, deletes_end);
auto [end_pack_index, rows_end_in_end_pack] = findPack(packs, rows_end, deletes_end);

size_t block_rows_start = rows_begin;
size_t block_rows_end = rows_begin;
size_t block_rows_end = rows_begin;

for (size_t pack_index = start_pack_index; pack_index < packs.size() && pack_index <= end_pack_index; ++pack_index)
{
Expand Down Expand Up @@ -260,7 +269,7 @@ BlockOrDeletes DeltaValueReader::getPlaceItems(size_t rows_begin, size_t deletes
{
// It is a DeltaPackBlock.
size_t rows_start_in_pack = pack_index == start_pack_index ? rows_start_in_start_pack : 0;
size_t rows_end_in_pack = pack_index == end_pack_index ? rows_end_in_end_pack : pack.getRows();
size_t rows_end_in_pack = pack_index == end_pack_index ? rows_end_in_end_pack : pack.getRows();

block_rows_end += rows_end_in_pack - rows_start_in_pack;

Expand All @@ -280,14 +289,14 @@ BlockOrDeletes DeltaValueReader::getPlaceItems(size_t rows_begin, size_t deletes
return res;
}

bool DeltaValueReader::shouldPlace(const DMContext & context,
DeltaIndexPtr my_delta_index,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 max_version)
bool DeltaValueReader::shouldPlace(const DMContext & context,
DeltaIndexPtr my_delta_index,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 max_version)
{
auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();
auto & packs = delta_snap->getPacks();
auto & packs = delta_snap->getPacks();

// Already placed.
if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes())
Expand All @@ -311,14 +320,14 @@ bool DeltaValueReader::shouldPlace(const DMContext & context,
throw Exception("pack is delete range", ErrorCodes::LOGICAL_ERROR);

size_t rows_start_in_pack = pack_index == start_pack_index ? rows_start_in_start_pack : 0;
size_t rows_end_in_pack = pack_rows[pack_index];
size_t rows_end_in_pack = pack_rows[pack_index];

auto & pack_reader = pack_readers[pack_index];
auto & dpb_reader = typeid_cast<DPBlockReader &>(*pack_reader);
auto pk_column = dpb_reader.getPKColumn();
auto version_column = dpb_reader.getVersionColumn();
auto & pack_reader = pack_readers[pack_index];
auto & dpb_reader = typeid_cast<DPBlockReader &>(*pack_reader);
auto pk_column = dpb_reader.getPKColumn();
auto version_column = dpb_reader.getVersionColumn();

auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
auto & version_col_data = toColumnVectorData<UInt64>(version_column);

for (auto i = rows_start_in_pack; i < rows_end_in_pack; ++i)
Expand Down
Loading