77namespace NKikimr ::NColumnShard {
88
99class TColumnShard ::TTxProgressTx : public TTransactionBase<TColumnShard> {
10- private:
11- struct TEvent {
12- TActorId Target;
13- ui64 Cookie;
14- THolder<IEventBase> Event;
15-
16- TEvent (TActorId target, ui64 cookie, THolder<IEventBase> event)
17- : Target(target)
18- , Cookie(cookie)
19- , Event(std::move(event))
20- { }
21- };
22-
23- struct TResultEvent {
24- TTxController::TBasicTxInfo TxInfo;
25- NKikimrTxColumnShard::EResultStatus Status;
26-
27- TResultEvent (TTxController::TBasicTxInfo&& txInfo, NKikimrTxColumnShard::EResultStatus status)
28- : TxInfo(std::move(txInfo))
29- , Status(status)
30- {}
31-
32- std::unique_ptr<IEventBase> MakeEvent (ui64 tabletId) const {
33- if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) {
34- auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited (tabletId, TxInfo.TxId );
35- return result;
36- } else {
37- auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
38- tabletId, TxInfo.TxKind , TxInfo.TxId , Status);
39- result->Record .SetStep (TxInfo.PlanStep );
40- return result;
41- }
42- }
43- };
44-
45- enum class ETriggerActivities {
46- NONE,
47- POST_INSERT,
48- POST_SCHEMA
49- };
50-
51- TStringBuilder TxPrefix () const {
52- return TStringBuilder () << " TxProgressTx[" << ToString (TabletTxNo) << " ] " ;
53- }
54-
55- TString TxSuffix () const {
56- return TStringBuilder () << " at tablet " << Self->TabletID ();
57- }
58-
5910public:
6011 TTxProgressTx (TColumnShard* self)
6112 : TTransactionBase(self)
@@ -81,60 +32,8 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
8132 ui64 step = plannedItem->PlanStep ;
8233 ui64 txId = plannedItem->TxId ;
8334
84- TTxController::TBasicTxInfo txInfo = *plannedItem;
85- switch (txInfo.TxKind ) {
86- case NKikimrTxColumnShard::TX_KIND_SCHEMA:
87- {
88- auto & meta = Self->AltersInFlight .at (txId);
89- Self->RunSchemaTx (meta.Body , NOlap::TSnapshot (step, txId), txc);
90- Self->ProtectSchemaSeqNo (meta.Body .GetSeqNo (), txc);
91- for (TActorId subscriber : meta.NotifySubscribers ) {
92- TxEvents.emplace_back (subscriber, 0 ,
93- MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID (), txId));
94- }
95- Self->AltersInFlight .erase (txId);
96- Trigger = ETriggerActivities::POST_SCHEMA;
97- break ;
98- }
99- case NKikimrTxColumnShard::TX_KIND_COMMIT: {
100- const auto & meta = Self->CommitsInFlight .at (txId);
101-
102- TBlobGroupSelector dsGroupSelector (Self->Info ());
103- NOlap::TDbWrapper dbTable (txc.DB , &dsGroupSelector);
104-
105- auto pathExists = [&](ui64 pathId) {
106- return Self->TablesManager .HasTable (pathId);
107- };
108-
109- auto counters = Self->InsertTable ->Commit (dbTable, step, txId, meta.WriteIds ,
110- pathExists);
111- Self->IncCounter (COUNTER_BLOBS_COMMITTED, counters.Rows );
112- Self->IncCounter (COUNTER_BYTES_COMMITTED, counters.Bytes );
113- Self->IncCounter (COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes );
114-
115- NIceDb::TNiceDb db (txc.DB );
116- for (TWriteId writeId : meta.WriteIds ) {
117- Self->RemoveLongTxWrite (db, writeId, txId);
118- }
119- Self->CommitsInFlight .erase (txId);
120- Self->UpdateInsertTableCounters ();
121- Trigger = ETriggerActivities::POST_INSERT;
122- break ;
123- }
124- case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: {
125- NOlap::TSnapshot snapshot (step, txId);
126- Y_ABORT_UNLESS (Self->OperationsManager ->CommitTransaction (*Self, txId, txc, snapshot));
127- Trigger = ETriggerActivities::POST_INSERT;
128- break ;
129- }
130- default : {
131- Y_ABORT (" Unexpected TxKind" );
132- }
133- }
134-
135- // Currently transactions never fail and there are no dependencies between them
136- TxResults.emplace_back (TResultEvent (std::move (txInfo), NKikimrTxColumnShard::SUCCESS));
137-
35+ TxOperator = Self->ProgressTxController ->GetVerifiedTxOperator (txId);
36+ AFL_VERIFY (TxOperator->Progress (*Self, NOlap::TSnapshot (step, txId), txc));
13837 Self->ProgressTxController ->FinishPlannedTx (txId, txc);
13938 Self->RescheduleWaitingReads ();
14039 }
@@ -148,24 +47,14 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
14847
14948 void Complete (const TActorContext& ctx) override {
15049 NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD)(" tablet_id" , Self->TabletID ())(" tx_state" , " complete" );
151-
152- for (auto & rec : TxEvents) {
153- ctx.Send (rec.Target , rec.Event .Release (), 0 , rec.Cookie );
154- }
155-
156- for (auto & res : TxResults) {
157- Self->ProgressTxController ->CompleteRunningTx (TTxController::TPlanQueueItem (res.TxInfo .PlanStep , res.TxInfo .TxId ));
158-
159- auto event = res.MakeEvent (Self->TabletID ());
160- ctx.Send (res.TxInfo .Source , event.release (), 0 , res.TxInfo .Cookie );
50+ if (TxOperator) {
51+ TxOperator->Complete (*Self, ctx);
16152 }
16253 Self->SetupIndexation ();
16354 }
16455
16556private:
166- std::vector<TResultEvent> TxResults;
167- std::vector<TEvent> TxEvents;
168- ETriggerActivities Trigger{ETriggerActivities::NONE};
57+ TTxController::ITransactionOperatior::TPtr TxOperator;
16958 const ui32 TabletTxNo;
17059};
17160
0 commit comments