diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index a0561a9c998f..efe78dfb74ed 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -26,6 +26,7 @@ bool TDataShard::TTxInit::Execute(TTransactionContext& txc, const TActorContext& Self->NextSeqno = 1; Self->NextChangeRecordOrder = 1; Self->LastChangeRecordGroup = 1; + Self->Pipeline.Reset(); Self->TransQueue.Reset(); Self->SnapshotManager.Reset(); Self->SchemaSnapshotManager.Reset(); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index d7afb3621bdb..0f7bc59da6b1 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -43,6 +43,34 @@ TPipeline::~TPipeline() } } +void TPipeline::Reset() { + ImmediateOps.clear(); + ActiveOps.clear(); + ActivePlannedOps.clear(); + DataTxCache.clear(); + DelayedAcks.clear(); + LastPlannedTx = {0, 0}; + LastCompleteTx = {0, 0}; + UtmostCompleteTx = {0, 0}; + KeepSchemaStep = 0; + LastCleanupTime = 0; + SchemaTx = nullptr; + ExecuteBlockers.clear(); + CandidateOps.clear(); + CandidateUnits.clear(); + NextActiveOp = {}; + SlowOpProfiles.clear(); + ActiveStreamingTxs.clear(); + PredictedPlan.clear(); + WaitingSchemeOpsOrder.clear(); + WaitingSchemeOps.clear(); + WaitingDataTxOps.clear(); + CommittingOps.Reset(); + CompletingOps.clear(); + WaitingDataReadIterators.clear(); + WaitingReadIteratorsById.clear(); +} + bool TPipeline::Load(NIceDb::TNiceDb& db) { using Schema = TDataShard::Schema; diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 705fc765966e..bd72d68a95e6 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -85,6 +85,7 @@ class TPipeline : TNonCopyable { TPipeline(TDataShard * self); ~TPipeline(); + void Reset(); bool Load(NIceDb::TNiceDb& db); void UpdateConfig(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TPipelineConfig& cfg); @@ -487,6 +488,11 @@ class TPipeline : TNonCopyable { } } + void Reset() { + TxIdMap.clear(); + ItemsSet.clear(); + } + inline bool HasOpsBelow(TRowVersion upperBound) const { return bool(ItemsSet) && *ItemsSet.begin() <= upperBound; }