diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 0c713fab0b0..89937f04794 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -839,13 +839,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( auto read_tag = need_row_id ? ReadTag::MVCC : ReadTag::Query; auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, read_tag, max_version); - RowKeyRanges real_ranges; - for (const auto & read_range : read_ranges) - { - auto real_range = rowkey_range.shrink(read_range); - if (!real_range.none()) - real_ranges.emplace_back(std::move(real_range)); - } + auto real_ranges = shrinkRowKeyRanges(read_ranges); if (real_ranges.empty()) return std::make_shared(toEmptyBlock(*read_info.read_columns)); @@ -985,10 +979,16 @@ BlockInputStreamPtr Segment::getInputStreamModeFast( const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, - const RowKeyRanges & data_ranges, + const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, size_t expected_block_size) { + auto real_ranges = shrinkRowKeyRanges(read_ranges); + if (real_ranges.empty()) + { + return std::make_shared(toEmptyBlock(columns_to_read)); + } + auto new_columns_to_read = std::make_shared(); // new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column. @@ -1032,7 +1032,7 @@ BlockInputStreamPtr Segment::getInputStreamModeFast( BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( dm_context, *new_columns_to_read, - data_ranges, + real_ranges, filter, std::numeric_limits::max(), expected_block_size, @@ -1049,8 +1049,8 @@ BlockInputStreamPtr Segment::getInputStreamModeFast( ReadTag::Query); // Do row key filtering based on data_ranges. - delta_stream = std::make_shared>(delta_stream, data_ranges, 0); - stable_stream = std::make_shared>(stable_stream, data_ranges, 0); + delta_stream = std::make_shared>(delta_stream, real_ranges, 0); + stable_stream = std::make_shared>(stable_stream, real_ranges, 0); // Filter the unneeded column and filter out the rows whose del_mark is true. delta_stream diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 402184e906e..87618701904 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -253,7 +253,7 @@ class Segment const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, - const RowKeyRanges & data_ranges, + const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, size_t expected_block_size = DEFAULT_BLOCK_SIZE); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp index f1f8a2108d1..0be30e0bdc8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp @@ -1344,6 +1344,118 @@ try } } CATCH + + +TEST_P(DeltaMergeStoreRWTest, TestFastScanWithLogicalSplit) +try +{ + constexpr auto num_write_rows = 32; + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + //Test write multi blocks without overlap and do not compact + { + auto block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false); + auto block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false); + auto block3 = DMTestEnv::prepareSimpleWriteBlock(2 * num_write_rows, 3 * num_write_rows, false); + switch (mode) + { + case TestMode::V1_BlockOnly: + case TestMode::V2_BlockOnly: + case TestMode::V3_BlockOnly: + { + store->write(*db_context, db_context->getSettingsRef(), block1); + store->write(*db_context, db_context->getSettingsRef(), block2); + store->write(*db_context, db_context->getSettingsRef(), block3); + break; + } + case TestMode::V2_FileOnly: + case TestMode::V3_FileOnly: + { + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range1, file_ids1] = genDMFile(*dm_context, block1); + auto [range2, file_ids2] = genDMFile(*dm_context, block2); + auto [range3, file_ids3] = genDMFile(*dm_context, block3); + auto range = range1.merge(range2).merge(range3); + auto file_ids = file_ids1; + file_ids.insert(file_ids.cend(), file_ids2.begin(), file_ids2.end()); + file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end()); + store->ingestFiles(dm_context, range, file_ids, false); + break; + } + case TestMode::V2_Mix: + case TestMode::V3_Mix: + { + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range1, file_ids1] = genDMFile(*dm_context, block1); + auto [range3, file_ids3] = genDMFile(*dm_context, block3); + auto range = range1.merge(range3); + auto file_ids = file_ids1; + file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end()); + store->ingestFiles(dm_context, range, file_ids, false); + store->write(*db_context, db_context->getSettingsRef(), block2); + + break; + } + } + + store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + } + + store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + store->mergeDeltaAll(*db_context); + + auto fastscan_rows = [&]() { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read( + *db_context, + db_context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* start_ts= */ std::numeric_limits::max(), + EMPTY_FILTER, + std::vector{}, + 0, + TRACING_NAME, + /* keep_order= */ false, + /* is_fast_scan= */ true, + /* expected_block_size= */ 1024)[0]; + size_t rows = 0; + in->readPrefix(); + while (true) + { + auto b = in->read(); + if (!b) + { + break; + } + rows += b.rows(); + } + return rows; + }; + + auto before_split = fastscan_rows(); + + ASSERT_EQ(store->segments.size(), 1); + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto old = store->segments.begin()->second; + auto [left, right] = store->segmentSplit( + *dm_context, + old, + DeltaMergeStore::SegmentSplitReason::ForegroundWrite, + std::nullopt, + DeltaMergeStore::SegmentSplitMode::Logical); + ASSERT_NE(left, nullptr); + ASSERT_NE(right, nullptr); + ASSERT_EQ(store->segments.size(), 2); + + auto after_split = fastscan_rows(); + + ASSERT_EQ(before_split, after_split); +} +CATCH } // namespace tests } // namespace DM } // namespace DB