diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index d9fc6acc86be..4457cac04e51 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1564,47 +1564,50 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& { const auto& rec = ev->Get()->Record; TBasicOpInfo info(rec.GetTxId(), EOperationKind::DataTx, EvWrite::Convertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex); - auto op = MakeIntrusive(info, ev, Self, txc, ctx); - op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END); + auto writeOp = MakeIntrusive(info, ev, Self, txc, ctx); + auto writeTx = writeOp->GetWriteTx(); + Y_ABORT_UNLESS(writeTx); + + writeOp->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END); auto badRequest = [&](const TString& error) { - op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error, Self->TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error, Self->TabletID()); LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); }; - if (!op->GetWriteTx()->Ready()) { - badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << op->GetTxId() << ": " << op->GetWriteTx()->GetError()); - return op; + if (!writeTx->Ready()) { + badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << writeOp->GetTxId() << ": " << writeOp->GetWriteTx()->GetError()); + return writeOp; } - op->ExtractKeys(); + writeOp->ExtractKeys(); switch (rec.txmode()) { case NKikimrDataEvents::TEvWrite::MODE_PREPARE: break; case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE: - op->SetVolatilePrepareFlag(); + writeOp->SetVolatilePrepareFlag(); break; case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE: - op->SetImmediateFlag(); + writeOp->SetImmediateFlag(); break; default: badRequest(TStringBuilder() << "Unknown txmode: " << rec.txmode()); - return op; + return writeOp; } // Make config checks for immediate op. - if (op->IsImmediate()) { + if (writeOp->IsImmediate()) { if (Config.NoImmediate() || (Config.ForceOnlineRW())) { - LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Shard " << Self->TabletID() << " force immediate op " << op->GetTxId() << " to online according to config"); - op->SetForceOnlineFlag(); + LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Shard " << Self->TabletID() << " force immediate writeOp " << writeOp->GetTxId() << " to online according to config"); + writeOp->SetForceOnlineFlag(); } else { if (Config.DirtyImmediate()) - op->SetForceDirtyFlag(); + writeOp->SetForceDirtyFlag(); } } - return op; + return writeOp; } void TPipeline::BuildDataTx(TActiveTransaction *tx, TTransactionContext &txc, const TActorContext &ctx)