Skip to content

Commit

Permalink
Storages: log region applied status when write (#8932)
Browse files Browse the repository at this point in the history
ref #8864
  • Loading branch information
JinheLin authored Apr 11, 2024
1 parent 94f15d1 commit 08c3ea1
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 9 deletions.
14 changes: 12 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,11 @@ Block DeltaMergeStore::addExtraColumnIfNeed(
return std::move(block);
}

DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_settings, Block & block)
DM::WriteResult DeltaMergeStore::write(
const Context & db_context,
const DB::Settings & db_settings,
Block & block,
const RegionAppliedStatus & applied_status)
{
LOG_TRACE(log, "Table write block, rows={} bytes={}", block.rows(), block.bytes());

Expand All @@ -554,7 +558,13 @@ DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Set
{
dedup_ver.insert(v);
}
LOG_DEBUG(log, "Record count: {}, Versions: {}", block.rows(), dedup_ver);
LOG_DEBUG(
log,
"region_id: {}, applied_index: {}, record_count: {}, versions: {}",
applied_status.region_id,
applied_status.applied_index,
block.rows(),
dedup_ver);
}
const auto bytes = block.bytes();

Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,11 @@ class DeltaMergeStore : private boost::noncopyable

static Block addExtraColumnIfNeed(const Context & db_context, const ColumnDefine & handle_define, Block && block);

DM::WriteResult write(const Context & db_context, const DB::Settings & db_settings, Block & block);
DM::WriteResult write(
const Context & db_context,
const DB::Settings & db_settings,
Block & block,
const RegionAppliedStatus & applied_status = {});

void deleteRange(const Context & db_context, const DB::Settings & db_settings, const RowKeyRange & delete_range);

Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ static void inline writeCommittedBlockDataIntoStorage(
AtomicReadWriteCtx & rw_ctx,
TableStructureLockHolder & lock,
ManageableStoragePtr & storage,
Block & block)
Block & block,
RegionAppliedStatus applied_status)
{
/// Write block into storage.
// Release the alter lock so that writing does not block DDL operations
Expand All @@ -97,7 +98,7 @@ static void inline writeCommittedBlockDataIntoStorage(
static_cast<Int32>(storage->engineType()));
// Note: do NOT use typeid_cast, since Storage is multi-inherited and typeid_cast will return nullptr
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
rw_ctx.write_result = dm_storage->write(block, rw_ctx.context.getSettingsRef());
rw_ctx.write_result = dm_storage->write(block, rw_ctx.context.getSettingsRef(), applied_status);
rw_ctx.write_part_cost = watch.elapsedMilliseconds();
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write)
.Observe(rw_ctx.write_part_cost / 1000.0);
Expand Down Expand Up @@ -172,7 +173,12 @@ static inline bool atomicReadWrite(
if constexpr (std::is_same_v<ReadList, RegionDataReadInfoList>)
{
RUNTIME_CHECK(block_ptr != nullptr);
writeCommittedBlockDataIntoStorage(rw_ctx, lock, storage, *block_ptr);
writeCommittedBlockDataIntoStorage(
rw_ctx,
lock,
storage,
*block_ptr,
{.region_id = region->id(), .applied_index = region->appliedIndex()});
storage->releaseDecodingBlock(block_decoding_schema_epoch, std::move(block_ptr));
}
else
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/KVStore/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,10 @@ using Timepoint = Clock::time_point;
using Duration = Clock::duration;
using Seconds = std::chrono::seconds;

struct RegionAppliedStatus
{
RegionID region_id = InvalidRegionID;
UInt64 applied_index = 0;
};

} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,10 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin
return std::make_shared<DMBlockOutputStream>(getAndMaybeInitStore(), decorator, global_context, settings);
}

WriteResult StorageDeltaMerge::write(Block & block, const Settings & settings)
WriteResult StorageDeltaMerge::write(
Block & block,
const Settings & settings,
const RegionAppliedStatus & applied_status)
{
auto & store = getAndMaybeInitStore();
#ifndef NDEBUG
Expand Down Expand Up @@ -549,7 +552,7 @@ WriteResult StorageDeltaMerge::write(Block & block, const Settings & settings)

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_write_to_storage);

return store->write(global_context, settings, block);
return store->write(global_context, settings, block, applied_status);
}

std::unordered_set<UInt64> parseSegmentSet(const ASTPtr & ast)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class StorageDeltaMerge
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;

/// Write from raft layer.
DM::WriteResult write(Block & block, const Settings & settings);
DM::WriteResult write(Block & block, const Settings & settings, const RegionAppliedStatus & applied_status = {});

void flushCache(const Context & context) override;

Expand Down

0 comments on commit 08c3ea1

Please sign in to comment.