Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAP: compute RU and match index and term when handling snapshot #8716

Merged
merged 15 commits into from
Jan 29, 2024
Merged
7 changes: 7 additions & 0 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ void TiFlashMetrics::addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru)
counter->Increment(ru);
}

UInt64 TiFlashMetrics::debugQueryReplicaSyncRU(UInt32 keyspace_id)
{
std::unique_lock lock(replica_sync_ru_mtx);
auto * counter = getReplicaSyncRUCounter(keyspace_id, lock);
return counter->Value();
}

prometheus::Counter * TiFlashMetrics::getReplicaSyncRUCounter(UInt32 keyspace_id, std::unique_lock<std::mutex> &)
{
auto itr = registered_keyspace_sync_replica_ru.find(keyspace_id);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ class TiFlashMetrics
static TiFlashMetrics & instance();

void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru);
UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id);

private:
TiFlashMetrics();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,12 @@ class DeltaMergeStore : private boost::noncopyable
return buildSegmentsFromCheckpointInfo(dm_context, range, checkpoint_info);
}

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const DMContextPtr & dm_context,
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info);

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const Context & db_context,
const DB::Settings & db_settings,
const DM::RowKeyRange & range,
Expand Down
19 changes: 14 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo(
return {};
}

void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
UInt64 DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
const DMContextPtr & dm_context,
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info)
Expand All @@ -1202,18 +1202,25 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
"Ingest checkpoint from remote meet empty range, ignore, store_id={} region_id={}",
checkpoint_info->getRemoteStoreId(),
checkpoint_info->regionId());
return;
return 0;
}

auto restored_segments = checkpoint_info->getRestoredSegments();
auto updated_segments = ingestSegmentsUsingSplit(dm_context, range, restored_segments);
auto estimated_bytes = 0;

for (const auto & segment : restored_segments)
{
estimated_bytes += segment->getEstimatedBytes();
}

LOG_INFO(
log,
"Ingest checkpoint from remote done, store_id={} region_id={} n_segments={}",
"Ingest checkpoint from remote done, store_id={} region_id={} n_segments={} est_bytes={}",
checkpoint_info->getRemoteStoreId(),
checkpoint_info->regionId(),
restored_segments.size());

restored_segments.size(),
estimated_bytes);

WriteBatches wbs{*dm_context->storage_pool};
for (auto & segment : restored_segments)
Expand All @@ -1228,6 +1235,8 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
// TODO(fap) This could be executed in a dedicated thread if it consumes too much time.
for (auto & segment : updated_segments)
checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::RaftSSTAndSnap);

return estimated_bytes;
}

} // namespace DM
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Storages/KVStore/FFI/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,19 @@ void HandleSafeTSUpdate(
uint64_t self_safe_ts,
uint64_t leader_safe_ts);
FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, uint64_t new_peer_id);
uint8_t ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id, uint8_t assert_exist);
FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id);
uint8_t ApplyFapSnapshot(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint8_t assert_exist,
uint64_t index,
uint64_t term);
FapSnapshotState QueryFapSnapshotState(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint64_t index,
uint64_t term);
void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id);
bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id);
}
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,18 @@ void KVStore::onSnapshot(
}
else if constexpr (std::is_same_v<RegionPtrWrap, RegionPtrWithCheckpointInfo>)
{
dm_storage->ingestSegmentsFromCheckpointInfo(
auto ingested_bytes = dm_storage->ingestSegmentsFromCheckpointInfo(
new_key_range,
new_region_wrap.checkpoint_info,
context.getSettingsRef());
if (auto [count, is_syncing] = getTiFlashReplicaSyncInfo(dm_storage); is_syncing)
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
{
// For write, 1 RU per KB. Reference: https://docs.pingcap.com/tidb/v7.0/tidb-resource-control
// Only calculate RU of one replica. So each replica reports 1/count consumptions.
TiFlashMetrics::instance().addReplicaSyncRU(
keyspace_id,
std::ceil(static_cast<double>(ingested_bytes) / 1024.0 / count));
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,28 @@ bool CheckpointIngestInfo::forciblyClean(
TMTContext & tmt,
const TiFlashRaftProxyHelper * proxy_helper,
UInt64 region_id,
bool in_memory)
bool in_memory,
CleanReason reason)
{
auto log = DB::Logger::get();
// For most cases, ingest infos are deleted in `removeFromLocal`.
auto checkpoint_ptr = CheckpointIngestInfo::restore(tmt, proxy_helper, region_id, 0);
LOG_INFO(
log,
"Erase CheckpointIngestInfo from disk by force, region_id={} exist={} in_memory={}",
"Erase CheckpointIngestInfo from disk by force, region_id={} exist={} in_memory={} reason={}",
region_id,
checkpoint_ptr != nullptr,
in_memory);
in_memory,
magic_enum::enum_name(reason));
if (unlikely(checkpoint_ptr))
{
// First delete the page, it may cause dangling data.
// However, never point to incomplete data then.
removeFromLocal(tmt, region_id);
CheckpointIngestInfo::deleteWrittenData(
tmt,
checkpoint_ptr->getRegion(),
checkpoint_ptr->getRestoredSegments());
removeFromLocal(tmt, region_id);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,22 @@ struct CheckpointIngestInfo
UInt64 region_id,
UInt64 peer_id);

enum class CleanReason
{
Success,
ProxyFallback,
TiFlashCancel,
ResolveStateApplySnapshot,
ResolveStateDestroy,
};

// Only call to clean dangling CheckpointIngestInfo.
static bool forciblyClean(
TMTContext & tmt,
const TiFlashRaftProxyHelper * proxy_helper,
UInt64 region_id,
bool in_memory);
bool in_memory,
CleanReason reason);
static bool cleanOnSuccess(TMTContext & tmt, UInt64 region_id);

FastAddPeerProto::CheckpointIngestInfoPersisted serializeMeta() const;
Expand Down
93 changes: 76 additions & 17 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ FastAddPeerRes FastAddPeerImplWrite(
region_id,
keyspace_id,
table_id);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, false);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::TiFlashCancel);
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
}
Expand All @@ -345,7 +345,7 @@ FastAddPeerRes FastAddPeerImplWrite(
region_id,
keyspace_id,
table_id);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, false);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::TiFlashCancel);
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
}
Expand Down Expand Up @@ -376,7 +376,7 @@ FastAddPeerRes FastAddPeerImplWrite(
region_id,
keyspace_id,
table_id);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, false);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::TiFlashCancel);
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
}
Expand Down Expand Up @@ -413,9 +413,11 @@ FastAddPeerRes FastAddPeerImpl(
auto elapsed = maybe_elapsed.value();
GET_METRIC(tiflash_fap_task_duration_seconds, type_queue_stage).Observe(elapsed / 1000.0);
GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Decrement();
// Consider phase1 -> restart -> phase1 -> fallback -> regular snapshot,
// We may find stale fap snapshot.
CheckpointIngestInfo::forciblyClean(tmt, proxy_helper, region_id, false);
// We don't delete fap snapshot if exists. However, there could be the following case:
// - Phase 1 generates an fap snapshot and TiFlash restarts before it could send faked snapshot.
// - Another phase 1 is started because the peer is not inited.
// - The phase 1 fallbacked. Leaving the FAP snapshot of previous phase 1.
// It is OK to preserve the stale fap snapshot, because we will compare (index, term) before pre/post apply.
auto res = FastAddPeerImplSelect(tmt, proxy_helper, region_id, new_peer_id);
if (std::holds_alternative<CheckpointRegionInfoAndData>(res))
{
Expand Down Expand Up @@ -458,7 +460,14 @@ FastAddPeerRes FastAddPeerImpl(
}
}

uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id)
uint8_t ApplyFapSnapshotImpl(
TMTContext & tmt,
TiFlashRaftProxyHelper * proxy_helper,
UInt64 region_id,
UInt64 peer_id,
bool assert_exist,
UInt64 index,
UInt64 term)
{
auto log = Logger::get("FastAddPeer");
Stopwatch watch_ingest;
Expand All @@ -467,6 +476,14 @@ uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_he
auto checkpoint_ingest_info = fap_ctx->getOrRestoreCheckpointIngestInfo(tmt, proxy_helper, region_id, peer_id);
if (!checkpoint_ingest_info)
{
if (assert_exist)
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected to have fap snapshot, region_id={}, peer_id={}",
region_id,
peer_id);
}
// If fap is enabled, and this region is not currently exists on proxy's side,
// proxy will check if we have a fap snapshot first.
// If we don't, the snapshot should be a regular snapshot.
Expand All @@ -487,6 +504,30 @@ uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_he
peer_id,
begin);
}
// `region_to_ingest` is not the region in kvstore.
auto region_to_ingest = checkpoint_ingest_info->getRegion();
RUNTIME_CHECK(region_to_ingest != nullptr);
if (!(region_to_ingest->appliedIndex() == index && region_to_ingest->appliedIndexTerm() == term))
{
if (assert_exist)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatched region and term, expected=({},{}) actual=({},{}) region_id={} peer_id={} begin_time={}",
index,
term,
region_to_ingest->appliedIndex(),
region_to_ingest->appliedIndexTerm(),
region_id,
peer_id,
begin);
}
else
{
LOG_DEBUG(log, "Fap snapshot not match, maybe stale, region_id={}, peer_id={}", region_id, peer_id);
return false;
}
}
LOG_INFO(log, "Begin apply fap snapshot, region_id={} peer_id={} begin_time={}", region_id, peer_id, begin);
// If there is `checkpoint_ingest_info`, it is exactly the data we want to ingest. Consider two scene:
// 1. If there was a failed FAP which failed to clean, its data will be overwritten by current FAP which has finished phase 1.
Expand All @@ -495,7 +536,7 @@ uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_he
GET_METRIC(tiflash_fap_task_state, type_ingesting_stage).Increment();
SCOPE_EXIT({ GET_METRIC(tiflash_fap_task_state, type_ingesting_stage).Decrement(); });
kvstore->handleIngestCheckpoint(checkpoint_ingest_info->getRegion(), checkpoint_ingest_info, tmt);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, true);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::Success);
GET_METRIC(tiflash_fap_task_duration_seconds, type_ingest_stage).Observe(watch_ingest.elapsedSeconds());
auto current = FAPAsyncTasks::getCurrentMillis();
auto elapsed = (current - begin) / 1000.0;
Expand All @@ -509,7 +550,12 @@ uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_he
}
}

FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id)
FapSnapshotState QueryFapSnapshotState(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint64_t index,
uint64_t term)
{
try
{
Expand All @@ -519,10 +565,14 @@ FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t
return FapSnapshotState::Other;
auto fap_ctx = server->tmt->getContext().getSharedContextDisagg()->fap_context;
// We just restore it, since if there is, it will soon be used.
if (fap_ctx->getOrRestoreCheckpointIngestInfo(*(server->tmt), server->proxy_helper, region_id, peer_id)
!= nullptr)
if (auto ptr
= fap_ctx->getOrRestoreCheckpointIngestInfo(*(server->tmt), server->proxy_helper, region_id, peer_id);
ptr != nullptr)
{
return FapSnapshotState::Persisted;
RUNTIME_CHECK(ptr->getRegion() != nullptr);
if (ptr->getRegion()->appliedIndex() == index && ptr->getRegion()->appliedIndexTerm() == term)
return FapSnapshotState::Persisted;
return FapSnapshotState::NotFound;
}
return FapSnapshotState::NotFound;
}
Expand All @@ -535,17 +585,21 @@ FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t
}
}

uint8_t ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id, uint8_t assert_exist)
uint8_t ApplyFapSnapshot(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint8_t assert_exist,
uint64_t index,
uint64_t term)
{
// TODO(fap) use assert_exist to check.
UNUSED(assert_exist);
try
{
RUNTIME_CHECK_MSG(server->tmt, "TMTContext is null");
RUNTIME_CHECK_MSG(server->proxy_helper, "proxy_helper is null");
if (!server->tmt->getContext().getSharedContextDisagg()->isDisaggregatedStorageMode())
return false;
return ApplyFapSnapshotImpl(*server->tmt, server->proxy_helper, region_id, peer_id);
return ApplyFapSnapshotImpl(*server->tmt, server->proxy_helper, region_id, peer_id, assert_exist, index, term);
}
catch (...)
{
Expand Down Expand Up @@ -696,7 +750,12 @@ void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id)
RUNTIME_CHECK_MSG(server->proxy_helper, "proxy_helper is null");
if (!server->tmt->getContext().getSharedContextDisagg()->isDisaggregatedStorageMode())
return;
CheckpointIngestInfo::forciblyClean(*(server->tmt), server->proxy_helper, region_id, false);
CheckpointIngestInfo::forciblyClean(
*(server->tmt),
server->proxy_helper,
region_id,
false,
CheckpointIngestInfo::CleanReason::ProxyFallback);
}
catch (...)
{
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,12 @@ FastAddPeerRes FastAddPeerImpl(
UInt64 region_id,
UInt64 new_peer_id,
UInt64 start_time);
uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id);
uint8_t ApplyFapSnapshotImpl(
TMTContext & tmt,
TiFlashRaftProxyHelper * proxy_helper,
UInt64 region_id,
UInt64 peer_id,
bool assert_exist,
UInt64 index,
UInt64 term);
} // namespace DB
Loading