From 08c3ea1844c8a775b9be8d8b371641ea3dc6e4e9 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 11 Apr 2024 19:22:52 +0800 Subject: [PATCH] Storages: log region applied status when write (#8932) ref pingcap/tiflash#8864 --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 14 ++++++++++++-- dbms/src/Storages/DeltaMerge/DeltaMergeStore.h | 6 +++++- .../Storages/KVStore/Decode/PartitionStreams.cpp | 12 +++++++++--- dbms/src/Storages/KVStore/Types.h | 6 ++++++ dbms/src/Storages/StorageDeltaMerge.cpp | 7 +++++-- dbms/src/Storages/StorageDeltaMerge.h | 2 +- 6 files changed, 38 insertions(+), 9 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index e05aa1d77bc..dcfaa594296 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -533,7 +533,11 @@ Block DeltaMergeStore::addExtraColumnIfNeed( return std::move(block); } -DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_settings, Block & block) +DM::WriteResult DeltaMergeStore::write( + const Context & db_context, + const DB::Settings & db_settings, + Block & block, + const RegionAppliedStatus & applied_status) { LOG_TRACE(log, "Table write block, rows={} bytes={}", block.rows(), block.bytes()); @@ -554,7 +558,13 @@ DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Set { dedup_ver.insert(v); } - LOG_DEBUG(log, "Record count: {}, Versions: {}", block.rows(), dedup_ver); + LOG_DEBUG( + log, + "region_id: {}, applied_index: {}, record_count: {}, versions: {}", + applied_status.region_id, + applied_status.applied_index, + block.rows(), + dedup_ver); } const auto bytes = block.bytes(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 6ed37ecec48..6c3931ef7b3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -291,7 +291,11 @@ class DeltaMergeStore : private boost::noncopyable static Block addExtraColumnIfNeed(const Context & db_context, const ColumnDefine & handle_define, Block && block); - DM::WriteResult write(const Context & db_context, const DB::Settings & db_settings, Block & block); + DM::WriteResult write( + const Context & db_context, + const DB::Settings & db_settings, + Block & block, + const RegionAppliedStatus & applied_status = {}); void deleteRange(const Context & db_context, const DB::Settings & db_settings, const RowKeyRange & delete_range); diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 657429b6181..c01b8c3fe25 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -83,7 +83,8 @@ static void inline writeCommittedBlockDataIntoStorage( AtomicReadWriteCtx & rw_ctx, TableStructureLockHolder & lock, ManageableStoragePtr & storage, - Block & block) + Block & block, + RegionAppliedStatus applied_status) { /// Write block into storage. // Release the alter lock so that writing does not block DDL operations @@ -97,7 +98,7 @@ static void inline writeCommittedBlockDataIntoStorage( static_cast(storage->engineType())); // Note: do NOT use typeid_cast, since Storage is multi-inherited and typeid_cast will return nullptr auto dm_storage = std::dynamic_pointer_cast(storage); - rw_ctx.write_result = dm_storage->write(block, rw_ctx.context.getSettingsRef()); + rw_ctx.write_result = dm_storage->write(block, rw_ctx.context.getSettingsRef(), applied_status); rw_ctx.write_part_cost = watch.elapsedMilliseconds(); GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write) .Observe(rw_ctx.write_part_cost / 1000.0); @@ -172,7 +173,12 @@ static inline bool atomicReadWrite( if constexpr (std::is_same_v) { RUNTIME_CHECK(block_ptr != nullptr); - writeCommittedBlockDataIntoStorage(rw_ctx, lock, storage, *block_ptr); + writeCommittedBlockDataIntoStorage( + rw_ctx, + lock, + storage, + *block_ptr, + {.region_id = region->id(), .applied_index = region->appliedIndex()}); storage->releaseDecodingBlock(block_decoding_schema_epoch, std::move(block_ptr)); } else diff --git a/dbms/src/Storages/KVStore/Types.h b/dbms/src/Storages/KVStore/Types.h index 791a2db1bcc..db0f1c68b00 100644 --- a/dbms/src/Storages/KVStore/Types.h +++ b/dbms/src/Storages/KVStore/Types.h @@ -75,4 +75,10 @@ using Timepoint = Clock::time_point; using Duration = Clock::duration; using Seconds = std::chrono::seconds; +struct RegionAppliedStatus +{ + RegionID region_id = InvalidRegionID; + UInt64 applied_index = 0; +}; + } // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 9f27d70b936..b91f0dc2832 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -487,7 +487,10 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin return std::make_shared(getAndMaybeInitStore(), decorator, global_context, settings); } -WriteResult StorageDeltaMerge::write(Block & block, const Settings & settings) +WriteResult StorageDeltaMerge::write( + Block & block, + const Settings & settings, + const RegionAppliedStatus & applied_status) { auto & store = getAndMaybeInitStore(); #ifndef NDEBUG @@ -549,7 +552,7 @@ WriteResult StorageDeltaMerge::write(Block & block, const Settings & settings) FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_write_to_storage); - return store->write(global_context, settings, block); + return store->write(global_context, settings, block, applied_status); } std::unordered_set parseSegmentSet(const ASTPtr & ast) diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 90fc5496dcd..4abcfe4d34d 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -95,7 +95,7 @@ class StorageDeltaMerge BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; /// Write from raft layer. - DM::WriteResult write(Block & block, const Settings & settings); + DM::WriteResult write(Block & block, const Settings & settings, const RegionAppliedStatus & applied_status = {}); void flushCache(const Context & context) override;