diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index 2d6c0a1aea16..0e200ad08412 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -122,8 +122,9 @@ class TKeyValueFlat : public TActor, public NTabletFlatExecutor:: TKeyValueFlat *Self; TVector TrashBeingCommitted; - TTxRequest(THolder intermediate, TKeyValueFlat *keyValueFlat) - : Intermediate(std::move(intermediate)) + TTxRequest(THolder intermediate, TKeyValueFlat *keyValueFlat, NWilson::TTraceId &&traceId) + : NTabletFlatExecutor::ITransaction(std::move(traceId)) + , Intermediate(std::move(intermediate)) , Self(keyValueFlat) { Intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_UNKNOWN); @@ -390,7 +391,7 @@ class TKeyValueFlat : public TActor, public NTabletFlatExecutor:: State.OnEvIntermediate(*(ev->Get()->Intermediate), ctx); auto traceId = ev->Get()->Intermediate->Span.GetTraceId(); - Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this), ctx, std::move(traceId)); + Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this, std::move(traceId)), ctx); } void Handle(TEvKeyValue::TEvNotify::TPtr &ev, const TActorContext &ctx) { diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 4f9f33e43863..2d2eca184568 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -895,8 +895,11 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq Counters->CreatedIterators->Inc(); ReadIdByTabletId[state->TabletId].push_back(id); + + NWilson::TTraceId traceId; // TODO: get traceId from kqp. + Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), - IEventHandle::FlagTrackDelivery); + IEventHandle::FlagTrackDelivery, 0, std::move(traceId)); if (!FirstShardStarted) { state->IsFirst = true; diff --git a/ydb/core/tablet_flat/flat_exec_seat.cpp b/ydb/core/tablet_flat/flat_exec_seat.cpp index 1af617a924c8..028f4f1eebde 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.cpp +++ b/ydb/core/tablet_flat/flat_exec_seat.cpp @@ -9,14 +9,14 @@ namespace NTabletFlatExecutor { } Self->Complete(ctx); - TxSpan.Attribute("rw", isRW); - TxSpan.EndOk(); + Self->TxSpan.Attribute("rw", isRW); + Self->TxSpan.EndOk(); } void TSeat::Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept { Self->Terminate(reason, ctx); - TxSpan.EndError("Terminated"); + Self->TxSpan.EndError("Terminated"); } } // namespace NTabletFlatExecutor diff --git a/ydb/core/tablet_flat/flat_exec_seat.h b/ydb/core/tablet_flat/flat_exec_seat.h index 70515168326d..0e501da2405d 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.h +++ b/ydb/core/tablet_flat/flat_exec_seat.h @@ -17,12 +17,10 @@ namespace NTabletFlatExecutor { TSeat(const TSeat&) = delete; - TSeat(ui32 uniqId, TAutoPtr self, NWilson::TTraceId txTraceId) + TSeat(ui32 uniqId, TAutoPtr self) : UniqID(uniqId) , Self(self) - , TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction")) { - } void Describe(IOutputStream &out) const noexcept @@ -36,8 +34,12 @@ namespace NTabletFlatExecutor { void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept; - void CreateEnqueuedSpan() noexcept { - WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued"); + NWilson::TSpan CreateExecutionSpan() noexcept { + return NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Execute"); + } + + void StartEnqueuedSpan() noexcept { + WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued"); } void FinishEnqueuedSpan() noexcept { @@ -45,7 +47,7 @@ namespace NTabletFlatExecutor { } void CreatePendingSpan() noexcept { - WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Pending"); + WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Pending"); } void FinishPendingSpan() noexcept { @@ -53,12 +55,11 @@ namespace NTabletFlatExecutor { } NWilson::TTraceId GetTxTraceId() const noexcept { - return TxSpan.GetTraceId(); + return Self->TxSpan.GetTraceId(); } const ui64 UniqID = Max(); const TAutoPtr Self; - NWilson::TSpan TxSpan; NWilson::TSpan WaitingSpan; ui64 Retries = 0; TPinned Pinned; diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index c3153b8f859d..645e5c2774e5 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -195,7 +195,7 @@ void TExecutor::RecreatePageCollectionsCache() noexcept auto &seat = xpair.second->Seat; xpair.second->WaitingSpan.EndOk(); LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->CreateEnqueuedSpan(); + seat->StartEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; } @@ -520,7 +520,7 @@ void TExecutor::PlanTransactionActivation() { TAutoPtr seat = PendingQueue->Pop(); seat->FinishPendingSpan(); LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->CreateEnqueuedSpan(); + seat->StartEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; --Stats->TxPending; @@ -541,7 +541,7 @@ void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueue it->second->WaitingSpan.EndOk(); auto &seat = it->second->Seat; LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->CreateEnqueuedSpan(); + seat->StartEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; TransactionWaitPads.erase(waitPad); @@ -1576,10 +1576,10 @@ bool TExecutor::CanExecuteTransaction() const { return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction; } -void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId) { +void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, const TActorContext &ctx) { Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation"); - TAutoPtr seat = new TSeat(++TransactionUniqCounter, self, std::move(traceId)); + TAutoPtr seat = new TSeat(++TransactionUniqCounter, self); LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self)); @@ -1624,7 +1624,7 @@ void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, cons if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) { LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->CreateEnqueuedSpan(); + seat->StartEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; PlanTransactionActivation(); @@ -1634,12 +1634,12 @@ void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, cons ExecuteTransaction(seat, ctx); } -void TExecutor::Execute(TAutoPtr self, const TActorContext &ctx, NWilson::TTraceId traceId) { - DoExecute(self, true, ctx, std::move(traceId)); +void TExecutor::Execute(TAutoPtr self, const TActorContext &ctx) { + DoExecute(self, true, ctx); } -void TExecutor::Enqueue(TAutoPtr self, const TActorContext &ctx, NWilson::TTraceId traceId) { - DoExecute(self, false, ctx, std::move(traceId)); +void TExecutor::Enqueue(TAutoPtr self, const TActorContext &ctx) { + DoExecute(self, false, ctx); } void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ctx) { @@ -1653,14 +1653,14 @@ void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ct PrivatePageCache->ResetTouchesAndToLoad(true); TPageCollectionTxEnv env(*Database, *PrivatePageCache); - TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId); + TTransactionContext txc(*seat, Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId); txc.NotEnoughMemory(seat->NotEnoughMemoryCount); Database->Begin(Stamp(), env); LWTRACK(TransactionExecuteBegin, seat->Self->Orbit, seat->UniqID); - - NWilson::TSpan txExecuteSpan(TWilsonTablet::Tablet, seat->GetTxTraceId(), "Tablet.Transaction.Execute"); + + NWilson::TSpan txExecuteSpan = seat->CreateExecutionSpan(); const bool done = seat->Self->Execute(txc, ctx.MakeFor(OwnerActorId)); txExecuteSpan.EndOk(); @@ -1857,7 +1857,7 @@ void TExecutor::PostponeTransaction(TAutoPtr seat, TPageCollectionTxEnv & // then tx may be re-activated. if (!PrivatePageCache->GetStats().CurrentCacheMisses) { LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->CreateEnqueuedSpan(); + seat->StartEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; PlanTransactionActivation(); @@ -2945,7 +2945,7 @@ void TExecutor::StartSeat(ui64 task, TResource *cookie_) noexcept PostponedTransactions.erase(it); Memory->AcquiredMemory(*seat, task); LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->CreateEnqueuedSpan(); + seat->StartEnqueuedSpan(); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; PlanTransactionActivation(); diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 2d136d7bd957..594ccb0dbecd 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -629,9 +629,9 @@ class TExecutor void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override; void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override; void DetachTablet(const TActorContext &ctx) override; - void DoExecute(TAutoPtr transaction, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId); - void Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override; - void Enqueue(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override; + void DoExecute(TAutoPtr transaction, bool allowImmediate, const TActorContext &ctx); + void Execute(TAutoPtr transaction, const TActorContext &ctx) override; + void Enqueue(TAutoPtr transaction, const TActorContext &ctx) override; TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false); TLeaseCommit* EnsureReadOnlyLease(TMonotonic at); diff --git a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp index 24ad5c0a9d2b..95a8379b2647 100644 --- a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp +++ b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp @@ -184,13 +184,29 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction( if (!Batch->Commit) { Batch->Commit = CommitManager->Begin(false, ECommit::Redo, seat->GetTxTraceId()); } else { + const TAutoPtr &tx = seat->Self; + // Batch commit's TraceId will be used for all blobstorage requests of the batch. + if (!Batch->Commit->TraceId && tx->TxSpan) { + // It is possible that the original or consequent transactions didn't have a TraceId, + // but if a new transaction of a batch has TraceId, use it for the whole batch + // (and consequent traced transactions). + Batch->Commit->TraceId = seat->GetTxTraceId(); + } else { + tx->TxSpan.Link(Batch->Commit->TraceId, {}); + } + i64 batchSize = Batch->Bodies.size() + 1; - Batch->Commit->FirstTx->TxSpan.Attribute("BatchSize", batchSize); + for (TSeat* curSeat = Batch->Commit->FirstTx; curSeat != nullptr; curSeat = curSeat->NextCommitTx) { + // Update batch size of the transaction, whose TraceId the commit uses (first transaction in batch, that has TraceId). + if (curSeat->Self->TxSpan) { + curSeat->Self->TxSpan.Attribute("BatchSize", batchSize); + break; + } + } - seat->TxSpan - .Attribute("Batched", true) - .Link(Batch->Commit->FirstTx->GetTxTraceId()); + tx->TxSpan.Attribute("Batched", true) + .Attribute("BatchSize", batchSize); } Batch->Commit->PushTx(seat.Get()); diff --git a/ydb/core/tablet_flat/tablet_flat_executed.cpp b/ydb/core/tablet_flat/tablet_flat_executed.cpp index 895a1c802897..de09bd2f6bac 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executed.cpp @@ -29,14 +29,14 @@ IExecutor* TTabletExecutedFlat::CreateExecutor(const TActorContext &ctx) { return Executor(); } -void TTabletExecutedFlat::Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId) { +void TTabletExecutedFlat::Execute(TAutoPtr transaction, const TActorContext &ctx) { Y_UNUSED(ctx); - Execute(transaction, std::move(traceId)); + Execute(transaction); } -void TTabletExecutedFlat::Execute(TAutoPtr transaction, NWilson::TTraceId traceId) { +void TTabletExecutedFlat::Execute(TAutoPtr transaction) { if (transaction) - static_cast(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId)); + static_cast(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext)); } void TTabletExecutedFlat::EnqueueExecute(TAutoPtr transaction) { diff --git a/ydb/core/tablet_flat/tablet_flat_executed.h b/ydb/core/tablet_flat/tablet_flat_executed.h index 75d4cd7a0c15..e1a61e40393e 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.h +++ b/ydb/core/tablet_flat/tablet_flat_executed.h @@ -23,8 +23,8 @@ class TTabletExecutedFlat : public NFlatExecutorSetup::ITablet { IExecutor* Executor() const { return Executor0; } const TInstant StartTime() const { return StartTime0; } - void Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}); - void Execute(TAutoPtr transaction, NWilson::TTraceId traceId = {}); + void Execute(TAutoPtr transaction, const TActorContext &ctx); + void Execute(TAutoPtr transaction); void EnqueueExecute(TAutoPtr transaction); const NTable::TScheme& Scheme() const noexcept; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 656e8d1a7679..fa14d8bb7d88 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -25,6 +26,7 @@ namespace NTabletFlatExecutor { class TTransactionContext; class TExecutor; struct TPageCollectionTxEnv; +struct TSeat; class TTableSnapshotContext : public TThrRefBase, TNonCopyable { friend class TExecutor; @@ -200,9 +202,10 @@ class TTransactionContext : public TTxMemoryProviderBase { friend class TExecutor; public: - TTransactionContext(ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env, + TTransactionContext(TSeat &seat, ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env, ui64 memoryLimit, ui64 taskId) : TTxMemoryProviderBase(memoryLimit, taskId) + , Seat(seat) , Tablet(tablet) , Generation(gen) , Step(step) @@ -226,6 +229,7 @@ class TTransactionContext : public TTxMemoryProviderBase { } public: + TSeat& Seat; const ui64 Tablet = Max(); const ui32 Generation = Max(); const ui32 Step = Max(); @@ -275,6 +279,12 @@ class ITransaction : TNonCopyable { : Orbit(std::move(orbit)) { } + ITransaction(NWilson::TTraceId &&traceId) + : TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction")) + { + TxSpan.Attribute("Type", TypeName(*this)); + } + virtual ~ITransaction() = default; /// @return true if execution complete and transaction is ready for commit virtual bool Execute(TTransactionContext &txc, const TActorContext &ctx) = 0; @@ -290,8 +300,15 @@ class ITransaction : TNonCopyable { out << TypeName(*this); } + void SetupTxSpan(NWilson::TTraceId traceId) noexcept { + TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction"); + TxSpan.Attribute("Type", TypeName(*this)); + } + public: NLWTrace::TOrbit Orbit; + + NWilson::TSpan TxSpan; }; template @@ -310,6 +327,11 @@ class TTransactionBase : public ITransaction { : ITransaction(std::move(orbit)) , Self(self) { } + + TTransactionBase(T *self, NWilson::TTraceId &&traceId) + : ITransaction(std::move(traceId)) + , Self(self) + { } }; struct TExecutorStats { @@ -515,8 +537,8 @@ namespace NFlatExecutorSetup { // all followers had completed log with requested gc-barrier virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0; - virtual void Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0; - virtual void Enqueue(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0; + virtual void Execute(TAutoPtr transaction, const TActorContext &ctx) = 0; + virtual void Enqueue(TAutoPtr transaction, const TActorContext &ctx) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at, std::function callback) = 0; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index cf3d9bd9d810..d537fce3f583 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2821,6 +2821,7 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr && Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx); } } + void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx) { auto* msg = ev->Get(); const auto& record = msg->Record; @@ -4116,12 +4117,13 @@ bool TDataShard::ReassignChannelsEnabled() const { } void TDataShard::ExecuteProgressTx(const TActorContext& ctx) { - Execute(new TTxProgressTransaction(this), ctx); + Execute(new TTxProgressTransaction(this, {}, {}), ctx); } void TDataShard::ExecuteProgressTx(TOperation::TPtr op, const TActorContext& ctx) { Y_ABORT_UNLESS(op->IsInProgress()); - Execute(new TTxProgressTransaction(this, std::move(op)), ctx); + NWilson::TTraceId traceId = op->GetTraceId(); + Execute(new TTxProgressTransaction(this, std::move(op), std::move(traceId)), ctx); } TDuration TDataShard::CleanupTimeout() const { diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index dba448cf6b4b..f4fd851519eb 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -952,6 +953,9 @@ struct TEvDataShard { // Orbit used for tracking request events NLWTrace::TOrbit Orbit; + + // Wilson span for this request. + NWilson::TSpan ReadSpan; }; struct TEvReadResult : public TEventPB + namespace NKikimr { namespace NDataShard { -TDataShard::TTxProgressTransaction::TTxProgressTransaction(TDataShard *self, TOperation::TPtr op) - : TBase(self) +TDataShard::TTxProgressTransaction::TTxProgressTransaction(TDataShard *self, TOperation::TPtr op, NWilson::TTraceId &&traceId) + : TBase(self, std::move(traceId)) , ActiveOp(std::move(op)) {} @@ -23,7 +25,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const return true; } - NIceDb::TNiceDb db(txc.DB); + NWilson::TSpan auxExecuteSpan; if (!ActiveOp) { const bool expireSnapshotsAllowed = ( @@ -44,6 +46,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const Self->Pipeline.ActivateWaitingTxOps(ctx); ActiveOp = Self->Pipeline.GetNextActiveOp(false); + if (!ActiveOp) { Self->IncCounter(COUNTER_TX_PROGRESS_IDLE); LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, @@ -56,6 +59,16 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const << ActiveOp->GetKind() << " " << *ActiveOp << " (unit " << ActiveOp->GetCurrentUnit() << ") at " << Self->TabletID()); ActiveOp->IncrementInProgress(); + + if (ActiveOp->OperationSpan) { + if (!TxSpan) { + // If Progress Tx for this operation is being executed the first time, + // it won't have a span, because we choose what operation to run in the transaction itself. + // We create transaction span and transaction execution spans here instead. + SetupTxSpan(ActiveOp->GetTraceId()); + auxExecuteSpan = txc.Seat.CreateExecutionSpan(); + } + } } Y_ABORT_UNLESS(ActiveOp && ActiveOp->IsInProgress()); @@ -68,6 +81,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const case EExecutionStatus::Restart: // Restart even if current CompleteList is not empty // It will be extended in subsequent iterations + auxExecuteSpan.EndOk(); return false; case EExecutionStatus::Reschedule: @@ -103,6 +117,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const } // Commit all side effects + auxExecuteSpan.EndOk(); return true; } catch (...) { Y_ABORT("there must be no leaked exceptions"); diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp index 03e9ee9a0d6d..51fcf70f0827 100644 --- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp +++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp @@ -15,15 +15,16 @@ TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *sel TEvDataShard::TEvProposeTransaction::TPtr &&ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed) - : TBase(self) + : TBase(self, std::move(ev->TraceId)) , Ev(std::move(ev)) , ReceivedAt(receivedAt) , TieBreakerIndex(tieBreakerIndex) , Kind(static_cast(Ev->Get()->GetTxKind())) , TxId(Ev->Get()->GetTxId()) , Acked(!delayed) - , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END) + , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, TxSpan.GetTraceId(), "ProposeTransaction", NWilson::EFlags::AUTO_END) { + ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID())); } bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransactionContext &txc, @@ -76,7 +77,7 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa return true; } - TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx); + TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId()); // Unsuccessful operation parse. if (op->IsAborted()) { diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index d5671dbac41e..6552f276d954 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1510,7 +1510,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { TReadIteratorId readId(Sender, Request->Record.GetReadId()); auto it = Self->ReadIterators.find(readId); if (it == Self->ReadIterators.end()) { - // the one who removed the iterator should have reply to user + // the one who removed the iterator should have replied to user LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " has been invalidated before TReadOperation::SendResult()"); return; @@ -1528,6 +1528,8 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")"); Result->Record.SetReadId(readId.ReadId); Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + + Request->ReadSpan.EndError("Iterator aborted"); Self->DeleteReadIterator(it); return; } @@ -1546,6 +1548,8 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TReadOperation::Execute() finished with error, aborting: " << record.DebugString()); Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + + Request->ReadSpan.EndError("Finished with error"); Self->DeleteReadIterator(it); return; } @@ -1597,7 +1601,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { Y_ABORT_UNLESS(it->second); - // note that we save the state only when there're unread queries + // Note that we save the state only when there are unread queries if (Reader->HasUnreadQueries()) { Reader->UpdateState(state, ResultSent); if (!state.IsExhausted()) { @@ -1612,6 +1616,8 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { } else { LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " finished in read"); + + Request->ReadSpan.EndOk(); Self->DeleteReadIterator(it); } } @@ -1865,8 +1871,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB bool WaitComplete = false; public: - TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev) - : TBase(ds) + TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev, NWilson::TTraceId &&traceId) + : TBase(ds, std::move(traceId)) , Ev(std::move(ev)) , ReadId(Ev->Sender, Ev->Get()->Record.GetReadId()) {} @@ -1904,7 +1910,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Failed to sync follower: " << errMessage << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + Ev->Get()->ReadSpan); return true; } } @@ -1916,6 +1923,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB auto& state = *readIt->second; auto* request = Ev->Get(); const auto& record = request->Record; + NWilson::TSpan &readSpan = request->ReadSpan; Y_ABORT_UNLESS(state.State == TReadIteratorState::EState::Init); @@ -1930,7 +1938,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB << ", tableId: " << state.PathId.LocalPathId << ", from shard with owner: " << Self->GetPathOwnerId() << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } @@ -1941,7 +1950,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Unknown table id: " << tableId << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } @@ -1951,7 +1961,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Can't read from a backup table" << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } @@ -1960,7 +1971,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Ydb::StatusIds::UNSUPPORTED, TStringBuilder() << "Cannot use read iterators without mvcc" << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } @@ -2032,7 +2044,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() << (Self->IsFollower() ? " RO replica" : "") << " (node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } } @@ -2044,7 +2057,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Ydb::StatusIds::UNSUPPORTED, TStringBuilder() << "Followers don't support system table reads" << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } if (!state.IsHeadRead) { @@ -2052,7 +2066,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Cannot read system table using snapshot " << state.ReadVersion << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } if (record.GetTableId().GetTableId() >= TDataShard::Schema::MinLocalTid) { @@ -2060,7 +2075,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Cannot read from user tables using system tables" << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } if (record.GetResultFormat() != NKikimrDataEvents::FORMAT_CELLVEC) { @@ -2069,7 +2085,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB TStringBuilder() << "Unsupported result format " << (int)record.GetResultFormat() << " when reading from system tables" << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } if (record.GetTableId().HasSchemaVersion()) { @@ -2079,7 +2096,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB << ", localTid: " << record.GetTableId().GetTableId() << ", with schema: " << record.GetTableId().GetSchemaVersion() << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", - ctx.SelfID.NodeId()); + ctx.SelfID.NodeId(), + readSpan); return true; } @@ -2089,6 +2107,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB const ui64 tieBreaker = Self->NextTieBreakerIndex++; Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev); + Op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, readSpan.GetTraceId(), "ReadIterator.ReadOperation", NWilson::EFlags::AUTO_END); + Op->BuildExecutionPlan(false); Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); @@ -2158,10 +2178,12 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB } } - void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message, ui32 nodeId) { + void ReplyError(Ydb::StatusIds::StatusCode code, const TString& message, ui32 nodeId, NWilson::TSpan &readSpan) { Reply = MakeEvReadResult(nodeId); SetStatusError(Reply->Record, code, message); Reply->Record.SetReadId(ReadId.ReadId); + + readSpan.EndError(message); } void Complete(const TActorContext& ctx) override { @@ -2175,6 +2197,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Y_ABORT_UNLESS(it->second); auto& state = *it->second; SendViaSession(state.SessionId, ReadId.Sender, Self->SelfId(), Reply.release()); + // ReadSpan is already ended in ReplyError. Self->DeleteReadIterator(it); } return; @@ -2212,8 +2235,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase bool DelayedResult = false; public: - TTxReadContinue(TDataShard* ds, TEvDataShard::TEvReadContinue::TPtr ev) - : TBase(ds) + TTxReadContinue(TDataShard* ds, TEvDataShard::TEvReadContinue::TPtr ev, NWilson::TTraceId &&traceId) + : TBase(ds, std::move(traceId)) , Ev(ev) {} @@ -2433,7 +2456,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase } void SendResult(const TActorContext& ctx) { - const auto* request = Ev->Get(); + auto* request = Ev->Get(); TReadIteratorId readId(request->Reader, request->ReadId); auto it = Self->ReadIterators.find(readId); Y_ABORT_UNLESS(it != Self->ReadIterators.end()); @@ -2448,6 +2471,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + + state.Request->ReadSpan.EndError("Iterator aborted"); Self->DeleteReadIterator(it); return; } @@ -2460,6 +2485,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString()); + + state.Request->ReadSpan.EndError("Finished with error"); Self->DeleteReadIterator(it); return; } @@ -2498,20 +2525,30 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase } else { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " finished in ReadContinue"); + + state.Request->ReadSpan.EndOk(); Self->DeleteReadIterator(it); } } }; void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ctx) { - // note that ins some cases we mutate this request below + // Note that we mutate this request below auto* request = ev->Get(); + + if (!request->ReadSpan) { + request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "DataShard.Read"); + } + const auto& record = request->Record; if (Y_UNLIKELY(!record.HasReadId())) { + TString msg = TStringBuilder() << "Missing ReadId at shard " << TabletID(); + auto result = MakeEvReadResult(ctx.SelfID.NodeId()); - SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, - TStringBuilder() << "Missing ReadId at shard " << TabletID()); + SetStatusError(result->Record, Ydb::StatusIds::BAD_REQUEST, msg); ctx.Send(ev->Sender, result.release()); + + request->ReadSpan.EndError(msg); return; } @@ -2520,17 +2557,21 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct TReadIteratorId readId(ev->Sender, record.GetReadId()); if (!Pipeline.HandleWaitingReadIterator(readId, request)) { // This request has been cancelled + request->ReadSpan.EndError("Cancelled"); return; } auto replyWithError = [&] (auto code, const auto& msg) { auto result = MakeEvReadResult(ctx.SelfID.NodeId()); + SetStatusError( result->Record, code, msg); result->Record.SetReadId(readId.ReadId); ctx.Send(ev->Sender, result.release()); + + request->ReadSpan.EndError(msg); }; if (Y_UNLIKELY(Pipeline.HasWaitingReadIterator(readId) || ReadIterators.contains(readId))) { @@ -2549,6 +2590,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } if (MediatorStateWaiting) { + // TODO: save span LWTRACK(ReadWaitMediatorState, request->Orbit); Pipeline.RegisterWaitingReadIterator(readId, request); MediatorStateWaitingMsgs.emplace_back(ev.Release()); @@ -2557,6 +2599,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct } if (Pipeline.HasProposeDelayers()) { + // TODO: save span LWTRACK(ReadWaitProposeDelayers, request->Orbit); Pipeline.RegisterWaitingReadIterator(readId, request); DelayedProposeQueue.emplace_back().Reset(ev.Release()); @@ -2572,9 +2615,9 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } - size_t totalInFly = - ReadIteratorsInFly() + TxInFly() + ImmediateInFly() - + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting(); + size_t totalInFly = ReadIteratorsInFly() + TxInFly() + ImmediateInFly() + + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting(); + if (totalInFly > GetMaxTxInFly()) { replyWithError( Ydb::StatusIds::OVERLOADED, @@ -2651,16 +2694,17 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); - Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx); + Executor()->Execute(new TTxReadViaPipeline(this, ev, request->ReadSpan.GetTraceId()), ctx); } void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) { TReadIteratorId readId(ev->Get()->Reader, ev->Get()->ReadId); - if (Y_UNLIKELY(!ReadIterators.contains(readId))) { + auto it = ReadIterators.find(readId); + if (Y_UNLIKELY(it == ReadIterators.end())) { return; } - - Executor()->Execute(new TTxReadContinue(this, ev), ctx); + + Executor()->Execute(new TTxReadContinue(this, ev, it->second->Request->ReadSpan.GetTraceId()), ctx); } void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& ctx) { @@ -2722,6 +2766,8 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& result->Record.SetReadId(readId.ReadId); SendViaSession(state.SessionId, readId.Sender, SelfId(), result.release()); + // We definitely have Request in the read iterator state, because it's state is not Init. + state.Request->ReadSpan.EndError(issueStr); DeleteReadIterator(it); return; } @@ -2776,6 +2822,9 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte LWTRACK(ReadCancel, state->Orbit); + if (state->Request) { + state->Request->ReadSpan.EndError("Cancelled"); + } DeleteReadIterator(it); } diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp index 905178057b30..581ee461bdb5 100644 --- a/ydb/core/tx/datashard/datashard__write.cpp +++ b/ydb/core/tx/datashard/datashard__write.cpp @@ -10,14 +10,15 @@ LWTRACE_USING(DATASHARD_PROVIDER) namespace NKikimr::NDataShard { TDataShard::TTxWrite::TTxWrite(TDataShard* self, NEvents::TDataEvents::TEvWrite::TPtr ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed) - : TBase(self) + : TBase(self, std::move(ev->TraceId)) , Ev(std::move(ev)) , ReceivedAt(receivedAt) , TieBreakerIndex(tieBreakerIndex) , TxId(Ev->Get()->GetTxId()) , Acked(!delayed) - , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END) + , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, TxSpan.GetTraceId(), "ProposeTransaction", NWilson::EFlags::AUTO_END) { + ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID())); } bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext& ctx) { @@ -70,7 +71,7 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext return true; } - TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx); + TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId()); // Unsuccessful operation parse. if (op->IsAborted()) { diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 3cfa3b416308..4c4ce56bd3ed 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -312,30 +312,6 @@ void TValidatedDataTx::ComputeDeadline() { } } -// - -TActiveTransaction::TActiveTransaction(const TBasicOpInfo &op, - TValidatedDataTx::TPtr dataTx) - : TActiveTransaction(op) -{ - TrackMemory(); - FillTxData(dataTx); -} - -TActiveTransaction::TActiveTransaction(TDataShard *self, - TTransactionContext &txc, - const TActorContext &ctx, - const TBasicOpInfo &op, - const TActorId &target, - const TString &txBody, - const TVector &locks, - ui64 artifactFlags) - : TActiveTransaction(op) -{ - TrackMemory(); - FillTxData(self, txc, ctx, target, txBody, locks, artifactFlags); -} - TActiveTransaction::~TActiveTransaction() { UntrackMemory(); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index f10462cf7a36..347ab511d87e 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -389,17 +389,6 @@ class TActiveTransaction : public TOperation { TrackMemory(); } - TActiveTransaction(const TBasicOpInfo &op, - TValidatedDataTx::TPtr savedTx); - TActiveTransaction(TDataShard *self, - TTransactionContext &txc, - const TActorContext &ctx, - const TBasicOpInfo &op, - const TActorId &target, - const TString &txBody, - const TVector &locks, - ui64 artifactFlags); - ~TActiveTransaction(); void FillTxData(TValidatedDataTx::TPtr dataTx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index c4bc0b6b0074..b49d871c8f28 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -52,6 +52,9 @@ #include #include +#include +#include +#include #include diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index c0888f272036..fa6e9ffdd2af 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1306,7 +1306,7 @@ void TPipeline::ForgetTx(ui64 txId) { TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::TPtr &ev, TInstant receivedAt, ui64 tieBreakerIndex, NTabletFlatExecutor::TTransactionContext &txc, - const TActorContext &ctx) + const TActorContext &ctx, NWilson::TTraceId traceId) { auto &rec = ev->Get()->Record; Y_ABORT_UNLESS(!(rec.GetFlags() & TTxFlags::PrivateFlagsMask)); @@ -1323,6 +1323,7 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: tx->SetTxBody(rec.GetTxBody()); tx->SetCookie(ev->Cookie); tx->Orbit = std::move(ev->Get()->Orbit); + tx->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "ActiveTransaction", NWilson::EFlags::AUTO_END); auto malformed = [&](const TStringBuf txType, const TString& txBody) { const TString error = TStringBuilder() << "Malformed " << txType << " tx" @@ -1556,11 +1557,15 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: return tx; } -TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& ev, TInstant receivedAt, ui64 tieBreakerIndex, NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) +TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& ev, + TInstant receivedAt, ui64 tieBreakerIndex, + NTabletFlatExecutor::TTransactionContext& txc, + const TActorContext& ctx, NWilson::TTraceId traceId) { const auto& rec = ev->Get()->Record; TBasicOpInfo info(rec.GetTxId(), EOperationKind::DataTx, EvWrite::Convertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex); auto op = MakeIntrusive(info, ev, Self, txc, ctx); + op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END); auto badRequest = [&](const TString& error) { op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error); diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 9e63590493c2..93229d369bf3 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -265,11 +265,11 @@ class TPipeline : TNonCopyable { TOperation::TPtr BuildOperation(TEvDataShard::TEvProposeTransaction::TPtr &ev, TInstant receivedAt, ui64 tieBreakerIndex, NTabletFlatExecutor::TTransactionContext &txc, - const TActorContext &ctx); + const TActorContext &ctx, NWilson::TTraceId traceId); TOperation::TPtr BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr &ev, TInstant receivedAt, ui64 tieBreakerIndex, NTabletFlatExecutor::TTransactionContext &txc, - const TActorContext &ctx); + const TActorContext &ctx, NWilson::TTraceId traceId); void BuildDataTx(TActiveTransaction *tx, TTransactionContext &txc, const TActorContext &ctx); diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index ab313b9843d4..ae862657b5f5 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -67,7 +67,7 @@ class TDataShard::TTxPlanStep : public NTabletFlatExecutor::TTransactionBase { public: - explicit TTxProgressTransaction(TDataShard *self, TOperation::TPtr op = nullptr); + explicit TTxProgressTransaction(TDataShard *self, TOperation::TPtr op, NWilson::TTraceId &&traceId); bool Execute(TTransactionContext &txc, const TActorContext &ctx) override; void Complete(const TActorContext &ctx) override; TTxType GetTxType() const override { return TXTYPE_PROGRESS_START; } @@ -92,6 +92,10 @@ class TDataShard::TTxProposeTransactionBase : public NTabletFlatExecutor::TTrans void Complete(const TActorContext &ctx) override; TTxType GetTxType() const override { return TXTYPE_PROPOSE; } + NWilson::TTraceId GetTraceId() const noexcept { + return ProposeTransactionSpan.GetTraceId(); + } + private: bool SyncSchemeOnFollower(TOutputOpData::TResultPtr &result, TTransactionContext &txc, @@ -117,6 +121,10 @@ class TDataShard::TTxWrite: public NTabletFlatExecutor::TTransactionBase