diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp index 496bd983b1e..5911d3d2e63 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp @@ -340,7 +340,7 @@ bool ColumnFileSetReader::shouldPlace( } else { - throw Exception("Unknown column file: " + column_file->toString(), ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column file: {}", column_file->toString()); } } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp index 84274ee6b3e..770fa0daac9 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp @@ -16,9 +16,7 @@ #include #include -namespace DB -{ -namespace DM +namespace DB::DM { RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const { @@ -30,5 +28,4 @@ RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const } return squashed_delete_range; } -} // namespace DM -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h index b5ef4966c6c..60dfed49164 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h @@ -17,9 +17,7 @@ #include #include -namespace DB -{ -namespace DM +namespace DB::DM { class ColumnFileSetSnapshot; using ColumnFileSetSnapshotPtr = std::shared_ptr; @@ -107,5 +105,4 @@ class ColumnFileSetSnapshot const auto & getDataProvider() const { return data_provider; } }; -} // namespace DM -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 4451ca3389c..45e72b0c844 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -350,7 +350,7 @@ class DeltaValueSnapshot #else public: #endif - bool is_update{false}; + const bool is_update{false}; // The delta index of cached. DeltaIndexPtr shared_delta_index; @@ -371,8 +371,7 @@ class DeltaValueSnapshot // We only allow one for_update snapshots to exist, so it cannot be cloned. RUNTIME_CHECK(!is_update); - auto c = std::make_shared(type); - c->is_update = is_update; + auto c = std::make_shared(type, is_update); c->shared_delta_index = shared_delta_index; c->delta_index_epoch = delta_index_epoch; c->mem_table_snap = mem_table_snap->clone(); @@ -383,8 +382,9 @@ class DeltaValueSnapshot return c; } - explicit DeltaValueSnapshot(CurrentMetrics::Metric type_) - : type(type_) + explicit DeltaValueSnapshot(CurrentMetrics::Metric type_, bool update_) + : is_update(update_) + , type(type_) { CurrentMetrics::add(type); } diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index 698ce530793..0062e1ba09a 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -38,8 +38,7 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot( if (abandoned.load(std::memory_order_relaxed)) return {}; - auto snap = std::make_shared(type); - snap->is_update = for_update; + auto snap = std::make_shared(type, for_update); snap->delta = this->shared_from_this(); auto storage_snap = std::make_shared( @@ -220,16 +219,18 @@ bool DeltaValueReader::shouldPlace( { auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus(); - // Already placed. + // The placed_rows, placed_delete_range already contains the data in delta_snap if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes()) return false; - if (relevant_range.all() || relevant_range == segment_range_ // + if (relevant_range.all() || relevant_range == segment_range_ // read all the data in this segment || delta_snap->getRows() - placed_rows > context.delta_cache_limit_rows // - || placed_delete_ranges != delta_snap->getDeletes()) + || placed_delete_ranges != delta_snap->getDeletes() // new delete_range appended, must place it + ) return true; - size_t rows_in_persisted_file_snap = delta_snap->getMemTableSetRowsOffset(); + // otherwise check persisted_files and mem_tables + const size_t rows_in_persisted_file_snap = delta_snap->getMemTableSetRowsOffset(); return persisted_files_reader->shouldPlace(context, relevant_range, start_ts, placed_rows) || mem_table_reader->shouldPlace( context, diff --git a/dbms/src/Storages/DeltaMerge/DeltaIndex.h b/dbms/src/Storages/DeltaMerge/DeltaIndex.h index 381c9d30430..40990453ff8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaIndex.h +++ b/dbms/src/Storages/DeltaMerge/DeltaIndex.h @@ -18,9 +18,7 @@ #include #include #include -namespace DB -{ -namespace DM +namespace DB::DM { class DeltaIndex; using DeltaIndexPtr = std::shared_ptr; @@ -171,6 +169,7 @@ class DeltaIndex return delta_tree->getBytes(); } + // Return std::pair getPlacedStatus() { std::scoped_lock lock(mutex); @@ -219,5 +218,4 @@ class DeltaIndex const std::optional & getRNCacheKey() const { return rn_cache_key; } }; -} // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index c40071bcbdc..1b64fb2067a 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -135,8 +135,7 @@ SegmentSnapshotPtr Serializer::deserializeSegment( auto data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; - auto delta_snap = std::make_shared(CurrentMetrics::DT_SnapshotOfDisaggReadNodeRead); - delta_snap->is_update = false; + auto delta_snap = std::make_shared(CurrentMetrics::DT_SnapshotOfDisaggReadNodeRead, false); delta_snap->mem_table_snap = deserializeColumnFileSet(dm_context, proto.column_files_memtable(), data_store, segment_range); delta_snap->persisted_files_snap diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 9bf094e7360..5ea01611d4d 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -121,6 +122,23 @@ extern const int UNKNOWN_FORMAT_VERSION; namespace DM { +String SegmentSnapshot::detailInfo() const +{ + return fmt::format( + "{{" + "stable_rows={} " + "persisted_rows={} persisted_dels={} persisted_cfs={} " + "mem_rows={} mem_dels={} mem_cfs={}" + "}}", + stable->getRows(), + delta->getPersistedFileSetSnapshot()->getRows(), + delta->getPersistedFileSetSnapshot()->getDeletes(), + delta->getPersistedFileSetSnapshot()->getColumnFileCount(), + delta->getMemTableSetSnapshot()->getRows(), + delta->getMemTableSetSnapshot()->getDeletes(), + delta->getMemTableSetSnapshot()->getColumnFileCount()); +} + const static size_t SEGMENT_BUFFER_SIZE = 128; // More than enough. DMFilePtr writeIntoNewDMFile( @@ -2401,7 +2419,7 @@ Segment::ReadInfo Segment::getReadInfo( ReadTag read_tag, UInt64 start_ts) const { - LOG_DEBUG(segment_snap->log, "Begin segment getReadInfo"); + LOG_DEBUG(segment_snap->log, "Begin segment getReadInfo {}", simpleInfo()); auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(is_common_handle), read_columns); auto pk_ver_col_defs = std::make_shared( @@ -2417,12 +2435,18 @@ Segment::ReadInfo Segment::getReadInfo( auto [my_delta_index, fully_indexed] = ensurePlace(dm_context, segment_snap, delta_reader, read_ranges, start_ts); auto compacted_index = my_delta_index->getDeltaTree()->getCompactedEntries(); - - // Hold compacted_index reference, to prevent it from deallocated. delta_reader->setDeltaIndex(compacted_index); - LOG_DEBUG(segment_snap->log, "Finish segment getReadInfo"); + LOG_DEBUG( + segment_snap->log, + "Finish segment getReadInfo, my_delta_index={} fully_indexed={} read_ranges={} " + "snap={} {}", + my_delta_index->toString(), + fully_indexed, + DB::DM::toDebugString(read_ranges), + segment_snap->detailInfo(), + simpleInfo()); if (fully_indexed) { @@ -2430,7 +2454,11 @@ Segment::ReadInfo Segment::getReadInfo( bool ok = segment_snap->delta->getSharedDeltaIndex()->updateIfAdvanced(*my_delta_index); if (ok) { - LOG_DEBUG(segment_snap->log, "Segment updated delta index"); + LOG_DEBUG( + segment_snap->log, + "Segment updated delta index, my_delta_index={} {}", + my_delta_index->toString(), + simpleInfo()); // Update cache size. if (auto cache = dm_context.global_context.getSharedContextDisagg()->rn_delta_index_cache; cache) cache->setDeltaIndex(segment_snap->delta->getSharedDeltaIndex()); @@ -2539,8 +2567,8 @@ std::pair Segment::ensurePlace( auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes()); auto my_delta_tree = my_delta_index->getDeltaTree(); - bool relevant_place = dm_context.enable_relevant_place; - bool skippable_place = dm_context.enable_skippable_place; + const bool relevant_place = dm_context.enable_relevant_place; + const bool skippable_place = dm_context.enable_skippable_place; // Note that, when enable_relevant_place is false , we cannot use the range of this segment. // Because some block / delete ranges could contain some data / range that are not belong to current segment. @@ -2552,7 +2580,9 @@ std::pair Segment::ensurePlace( // Let's do a fast check, determine whether we need to do place or not. if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, start_ts)) + { return {my_delta_index, false}; + } CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_PlaceIndexUpdate}; GET_METRIC(tiflash_storage_subtask_count, type_place_index_update).Increment(); @@ -2578,8 +2608,11 @@ std::pair Segment::ensurePlace( auto offset = v.getBlockOffset(); auto rows = block.rows(); - if (unlikely(my_placed_rows != offset)) - throw Exception("Place block offset not match", ErrorCodes::LOGICAL_ERROR); + RUNTIME_CHECK_MSG( + my_placed_rows == offset, + "Place block offset not match, my_placed_rows={} offset={}", + my_placed_rows, + offset); if (skippable_place) fully_indexed &= placeUpsert( @@ -2629,25 +2662,24 @@ std::pair Segment::ensurePlace( } } - if (unlikely(my_placed_rows != delta_snap->getRows() || my_placed_deletes != delta_snap->getDeletes())) - { - throw Exception(fmt::format( - "Placed status not match! Expected place rows:{}, deletes:{}, but actually placed rows:{}, deletes:{}", - delta_snap->getRows(), - delta_snap->getDeletes(), - my_placed_rows, - my_placed_deletes)); - } + RUNTIME_CHECK_MSG( + my_placed_rows == delta_snap->getRows() && my_placed_deletes == delta_snap->getDeletes(), + "Placed status not match! Expected place rows:{}, deletes:{}, but actually placed rows:{}, deletes:{}", + delta_snap->getRows(), + delta_snap->getDeletes(), + my_placed_rows, + my_placed_deletes); my_delta_index->update(my_delta_tree, my_placed_rows, my_placed_deletes); LOG_DEBUG( segment_snap->log, - "Finish segment ensurePlace, read_ranges={} placed_items={} shared_delta_index={} my_delta_index={}", + "Finish segment ensurePlace, read_ranges={} placed_items={} shared_delta_index={} my_delta_index={} {}", DB::DM::toDebugString(read_ranges), items.size(), delta_snap->getSharedDeltaIndex()->toString(), - my_delta_index->toString()); + my_delta_index->toString(), + simpleInfo()); return {my_delta_index, fully_indexed}; } diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 9add3a6f9fc..53941afe331 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -73,6 +73,8 @@ struct SegmentSnapshot : private boost::noncopyable // handle + version + flag return (sizeof(Int64) + sizeof(UInt64) + sizeof(UInt8)) * getRows(); } + + String detailInfo() const; }; /// A segment contains many rows of a table. A table is split into segments by consecutive ranges. @@ -570,7 +572,13 @@ class Segment /// Returns whether this segment has been marked as abandoned. /// Note: Segment member functions never abandon the segment itself. /// The abandon state is usually triggered by the DeltaMergeStore. - bool hasAbandoned() const { return delta->hasAbandoned(); } + bool hasAbandoned() const + { + // `delta` at disagg read-node is empty + if (unlikely(!delta)) + return false; + return delta->hasAbandoned(); + } bool isSplitForbidden() const { return split_forbidden; } void forbidSplit() { split_forbidden = true; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp index d9ad3252a0a..1966c8dba9c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp @@ -657,7 +657,7 @@ try } CATCH -TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_NoTiny_NoInMem) +TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesNoTinyNoInMem) try { auto fp_guard = disableFlushCache(); @@ -704,7 +704,7 @@ try } CATCH -TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_Tiny_NoInMem) +TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesTinyNoInMem) try { auto fp_guard = disableFlushCache(); @@ -747,7 +747,7 @@ try } CATCH -TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_NoTiny_InMem) +TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesNoTinyInMem) try { auto fp_guard = disableFlushCache(); @@ -798,7 +798,7 @@ try } CATCH -TEST_F(DMStoreForSegmentReadTaskTest, FetchPages_Tiny_InMem) +TEST_F(DMStoreForSegmentReadTaskTest, FetchPagesTinyInMem) try { auto fp_guard = disableFlushCache(); diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index cad8876e2ea..b4bea147297 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -341,7 +341,7 @@ RegionsReadIndexResult LearnerReadWorker::readIndex( void LearnerReadWorker::waitIndex( const LearnerReadSnapshot & regions_snapshot, - RegionsReadIndexResult & batch_read_index_result, + const RegionsReadIndexResult & batch_read_index_result, const UInt64 timeout_ms, Stopwatch & watch) { @@ -436,7 +436,7 @@ void LearnerReadWorker::waitIndex( unavailable_regions.size(), mvcc_query_info.start_ts); - auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String { + auto bypass_formatter = [](const RegionQueryInfo & query_info) -> String { if (query_info.bypass_lock_ts == nullptr) return ""; FmtBuffer buffer; @@ -456,9 +456,11 @@ void LearnerReadWorker::waitIndex( regions_info.end(), [&](const auto & region_to_query, FmtBuffer & f) { const auto & region = regions_snapshot.find(region_to_query.region_id)->second; + const auto index_to_wait = batch_read_index_result.find(region_to_query.region_id)->second.read_index(); f.fmtAppend( - "(id:{} applied_index:{} bypass_locks:{})", + "(region_id={} to_wait={} applied_index={} bypass_locks={})", region_to_query.region_id, + index_to_wait, region->appliedIndex(), bypass_formatter(region_to_query)); }, diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h index f546257bea7..da42ed2b9f0 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h @@ -150,7 +150,7 @@ class LearnerReadWorker /// wait index relate methods void waitIndex( const LearnerReadSnapshot & regions_snapshot, - RegionsReadIndexResult & batch_read_index_result, + const RegionsReadIndexResult & batch_read_index_result, UInt64 timeout_ms, Stopwatch & watch);