From d50ae3beba5f4d10839ddb9586108dcffb72a8b3 Mon Sep 17 00:00:00 2001 From: Wish Date: Mon, 4 Jul 2022 16:52:32 +0800 Subject: [PATCH] Flush segment cache when doing the compaction Signed-off-by: Wish --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 7 +++ .../tests/gtest_dm_delta_merge_store.cpp | 49 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 195ed5c53c2..23ed756da27 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1055,6 +1055,13 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex segment = segment_it->second; } + if (!segment->flushCache(*dm_context)) + { + // If the flush failed, it means there are parallel updates to the segment in the background. + // In this case, we try again. + continue; + } + const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); if (new_segment) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index d46e1b7aa36..b7913c44a2c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3762,6 +3762,55 @@ try CATCH +// Verify that unflushed data will also be compacted. +TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, Flush) +try +{ + { + // Write data to first 3 segments and flush. + auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2]; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + helper->expected_delta_rows[0] += helper->rows_by_segments[0]; + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->expected_delta_rows[2] += helper->rows_by_segments[2]; + helper->verifyExpectedRowsForAllSegments(); + + auto segment1 = std::next(store->segments.begin())->second; + ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0); + } + { + // Write new data to segment[1] without flush. + auto newly_written_rows = helper->rows_by_segments[1]; + Block block = DMTestEnv::prepareSimpleWriteBlock(helper->rows_by_segments[0], helper->rows_by_segments[0] + newly_written_rows, false, pk_type, 10 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->verifyExpectedRowsForAllSegments(); + + auto segment1 = std::next(store->segments.begin())->second; + ASSERT_GT(segment1->getDelta()->getUnsavedRows(), 0); + } + { + auto segment1 = std::next(store->segments.begin())->second; + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + ASSERT_NE(result, std::nullopt); + + segment1 = std::next(store->segments.begin())->second; + ASSERT_EQ(*result, segment1->getRowKeyRange()); + + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); + + ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0); + } +} +CATCH + + } // namespace tests } // namespace DM } // namespace DB