From e1ce008572bb3c2aad249ee51dc5ccdd7e80c30d Mon Sep 17 00:00:00 2001 From: Alek5andr-Kotov Date: Sat, 28 Dec 2024 14:49:36 +0300 Subject: [PATCH] TxInFly and TxCompleteLag metrics for PQ (#13072) --- ydb/core/persqueue/pq_impl.cpp | 43 ++++++++++++++++++++++++++++--- ydb/core/persqueue/pq_impl.h | 4 +++ ydb/core/protos/counters_pq.proto | 2 ++ 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 6ba9ccf0226a..2bae93536260 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -997,6 +997,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& } Txs.emplace(tx.GetTxId(), tx); + SetTxInFlyCounter(); if (tx.HasStep()) { if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { @@ -1013,6 +1014,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& EndInitTransactions(); EndReadConfig(ctx); + + SetTxCounters(); } void TPersQueue::EndReadConfig(const TActorContext& ctx) @@ -3087,6 +3090,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { } MeteringSink.MayFlush(ctx.Now()); DeleteExpiredTransactions(ctx); + SetTxCounters(); ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } @@ -3107,6 +3111,33 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx) TryWriteTxs(ctx); } +void TPersQueue::SetTxCounters() +{ + SetTxCompleteLagCounter(); + SetTxInFlyCounter(); +} + +void TPersQueue::SetTxCompleteLagCounter() +{ + ui64 lag = 0; + + if (!TxQueue.empty()) { + ui64 firstTxStep = TxQueue.front().first; + ui64 currentStep = TAppData::TimeProvider->Now().MilliSeconds(); + + if (currentStep > firstTxStep) { + lag = currentStep - firstTxStep; + } + } + + Counters->Simple()[COUNTER_PQ_TABLET_TX_COMPLETE_LAG] = lag; +} + +void TPersQueue::SetTxInFlyCounter() +{ + Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size(); +} + void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal"); @@ -3580,6 +3611,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord(); TDistributedTransaction& tx = Txs[event.GetTxId()]; + SetTxInFlyCounter(); switch (tx.State) { case NKikimrPQ::TTransaction::UNKNOWN: @@ -3661,6 +3693,7 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) TryExecuteTxs(ctx, tx); TxQueue.emplace_back(step, txId); + SetTxCompleteLagCounter(); } else { PQ_LOG_W("Transaction already planned for step " << tx.Step << ", Step: " << step << @@ -4400,6 +4433,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(), "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, TabletID(), tx.TxId, TxsOrder[tx.State].front()); + Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second, + "PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64, + TabletID(), tx.TxId, TxQueue.front().second); + TxQueue.pop_front(); + SetTxCompleteLagCounter(); SendEvReadSetAckToSenders(ctx, tx); @@ -4419,13 +4457,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::DELETING: // The PQ tablet has persisted its state. Now she can delete the transaction and take the next one. - if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { - TxQueue.pop_front(); - } - DeleteWriteId(tx.WriteId); PQ_LOG_D("delete TxId " << tx.TxId); Txs.erase(tx.TxId); + SetTxInFlyCounter(); // If this was the last transaction, then you need to send responses to messages about changes // in the status of the PQ tablet (if they came) diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 524e5d02b282..d3441f76b262 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -443,6 +443,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void DeleteExpiredTransactions(const TActorContext& ctx); void Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx); + void SetTxCounters(); + void SetTxCompleteLagCounter(); + void SetTxInFlyCounter(); + bool CanProcessProposeTransactionQueue() const; bool CanProcessPlanStepQueue() const; bool CanProcessWriteTxs() const; diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 8c975d9a4b7b..6aab74eb959f 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -68,6 +68,8 @@ enum ESimpleCounters { COUNTER_PQ_TABLET_OPENED_PIPES = 5 [(CounterOpts) = {Name: "OpenedPipes"}]; COUNTER_PQ_TABLET_INFLIGHT = 6 [(CounterOpts) = {Name: "RequestsInflight"}]; + COUNTER_PQ_TABLET_TX_COMPLETE_LAG = 7 [(CounterOpts) = {Name: "TxCompleteLag"}]; + COUNTER_PQ_TABLET_TX_IN_FLY = 8 [(CounterOpts) = {Name: "TxInFly"}]; } enum EPercentileCounters {