diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp index 8a45a5ef9ee8..b02c3f0e52fd 100644 --- a/ydb/core/tx/datashard/check_write_unit.cpp +++ b/ydb/core/tx/datashard/check_write_unit.cpp @@ -52,7 +52,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, TWriteOperation* writeOp = dynamic_cast(op.Get()); Y_VERIFY_S(writeOp, "cannot cast operation of kind " << op->GetKind()); - auto writeTx = writeOp->WriteTx(); + auto writeTx = writeOp->GetWriteTx(); Y_ABORT_UNLESS(writeTx); Y_ABORT_UNLESS(writeTx->Ready() || writeTx->RequirePrepare()); @@ -66,7 +66,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID()); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); @@ -87,7 +87,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, << " bytes which exceeds limit " << NLimits::MaxWriteKeySize << " bytes at " << DataShard.TabletID(); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID()); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); @@ -103,7 +103,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, << "Transaction write column value of " << col.ImmediateUpdateSize << " bytes is larger than the allowed threshold"; - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID()); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); @@ -126,7 +126,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID()); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); @@ -143,7 +143,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, if (!Pipeline.AssignPlanInterval(op)) { TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID(); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err, DataShard.TabletID()); op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err); @@ -151,7 +151,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, return EExecutionStatus::Executed; } - writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->WriteTx()->TabletId(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}})); + writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(DataShard.TabletID(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}})); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID()); } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index a3384c1fce49..d681172b90ad 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2571,7 +2571,9 @@ class TDataShard TInstant StartedKeyAccessSamplingAt; TInstant StopKeyAccessSamplingAt; - THashMap TableInfos; // tableId -> local table info + using TTableInfos = THashMap; + + TTableInfos TableInfos; // tableId -> local table info TTransQueue TransQueue; TOutReadSets OutReadSets; TPipeline Pipeline; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index fa6e9ffdd2af..d9fc6acc86be 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1568,12 +1568,12 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END); auto badRequest = [&](const TString& error) { - op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error); + op->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error, Self->TabletID()); LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); }; - if (!op->WriteTx()->Ready()) { - badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << op->GetTxId() << ": " << op->WriteTx()->GetError()); + if (!op->GetWriteTx()->Ready()) { + badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << op->GetTxId() << ": " << op->GetWriteTx()->GetError()); return op; } diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index 0b0d9a5b8804..2cd3af217566 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -17,18 +17,14 @@ namespace NKikimr { namespace NDataShard { - - TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev) - : StepTxId_(stepTxId) - , TabletId_(self->TabletID()) - , Ev_(ev) + : Ev(ev) , EngineBay(self, txc, ctx, stepTxId.ToPair()) - , ErrCode(NKikimrTxDataShard::TError::OK) + , StepTxId(stepTxId) + , ReceivedAt(receivedAt) , TxSize(0) - , TxCacheUsage(0) + , ErrCode(NKikimrTxDataShard::TError::OK) , IsReleased(false) - , ReceivedAt_(receivedAt) { ComputeTxSize(); NActors::NMemory::TLabel::Add(TxSize); @@ -43,12 +39,12 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId_ << " at " << TabletId() << ", record: " << Record().ShortDebugString()); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << self->TabletID() << ", record: " << GetRecord().ShortDebugString()); - if (!ParseRecord(self)) + if (!ParseRecord(self->TableInfos)) return; - SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, ctx); + SetTxKeys(RecordOperation().GetColumnIds(), typeRegistry, self->TabletID(), ctx); KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay); EngineBay.MarkTxLoaded(); @@ -58,8 +54,8 @@ TValidatedWriteTx::~TValidatedWriteTx() { NActors::NMemory::TLabel::Sub(TxSize); } -bool TValidatedWriteTx::ParseRecord(TDataShard* self) { - if (Record().GetOperations().size() != 1) +bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) { + if (GetRecord().GetOperations().size() != 1) { ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; ErrStr = TStringBuilder() << "Only one operation is supported now."; @@ -68,19 +64,19 @@ bool TValidatedWriteTx::ParseRecord(TDataShard* self) { const NKikimrDataEvents::TTableId& tableIdRecord = RecordOperation().GetTableId(); - auto tableInfoPtr = self->TableInfos.FindPtr(tableIdRecord.GetTableId()); + auto tableInfoPtr = tableInfos.FindPtr(tableIdRecord.GetTableId()); if (!tableInfoPtr) { ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist"; return false; } - TableInfo_ = tableInfoPtr->Get(); - Y_ABORT_UNLESS(TableInfo_); + TableInfo = tableInfoPtr->Get(); + Y_ABORT_UNLESS(TableInfo); - if (TableInfo_->GetTableSchemaVersion() != 0 && tableIdRecord.GetSchemaVersion() != TableInfo_->GetTableSchemaVersion()) + if (TableInfo->GetTableSchemaVersion() != 0 && tableIdRecord.GetSchemaVersion() != TableInfo->GetTableSchemaVersion()) { ErrCode = NKikimrTxDataShard::TError::SCHEME_CHANGED; - ErrStr = TStringBuilder() << "Table '" << TableInfo_->Path << "' scheme changed."; + ErrStr = TStringBuilder() << "Table '" << TableInfo->Path << "' scheme changed."; return false; } @@ -91,10 +87,10 @@ bool TValidatedWriteTx::ParseRecord(TDataShard* self) { return false; } - NEvWrite::TPayloadHelper payloadHelper(*Ev_->Get()); + NEvWrite::TPayloadHelper payloadHelper(*Ev->Get()); TString payload = payloadHelper.GetDataFromPayload(RecordOperation().GetPayloadIndex()); - if (!TSerializedCellMatrix::TryParse(payload, Matrix_)) + if (!TSerializedCellMatrix::TryParse(payload,Matrix)) { ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; ErrStr = TStringBuilder() << "Can't parse TSerializedCellVec in payload"; @@ -102,22 +98,22 @@ bool TValidatedWriteTx::ParseRecord(TDataShard* self) { } const auto& columnTags = RecordOperation().GetColumnIds(); - if ((size_t)columnTags.size() != Matrix_.GetColCount()) + if ((size_t)columnTags.size() != Matrix.GetColCount()) { ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; - ErrStr = TStringBuilder() << "Column count mismatch: got columnids " << columnTags.size() << ", got cells count " << Matrix_.GetColCount(); + ErrStr = TStringBuilder() << "Column count mismatch: got columnids " << columnTags.size() << ", got cells count " <KeyColumnIds.size()) + if ((size_t)columnTags.size() < TableInfo->KeyColumnIds.size()) { ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; - ErrStr = TStringBuilder() << "Column count mismatch: got " << columnTags.size() << ", expected greater or equal than key column count " << TableInfo_->KeyColumnIds.size(); + ErrStr = TStringBuilder() << "Column count mismatch: got " << columnTags.size() << ", expected greater or equal than key column count " << TableInfo->KeyColumnIds.size(); return false; } - for (size_t i = 0; i < TableInfo_->KeyColumnIds.size(); ++i) { - if (RecordOperation().columnids(i) != TableInfo_->KeyColumnIds[i]) { + for (size_t i = 0; i < TableInfo->KeyColumnIds.size(); ++i) { + if (RecordOperation().columnids(i) != TableInfo->KeyColumnIds[i]) { ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; ErrStr = TStringBuilder() << "Key column schema at position " << i; return false; @@ -125,7 +121,7 @@ bool TValidatedWriteTx::ParseRecord(TDataShard* self) { } for (ui32 columnTag : columnTags) { - auto* col = TableInfo_->Columns.FindPtr(columnTag); + auto* col = TableInfo->Columns.FindPtr(columnTag); if (!col) { ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR; ErrStr = TStringBuilder() << "Missing column with id " << columnTag; @@ -133,7 +129,7 @@ bool TValidatedWriteTx::ParseRecord(TDataShard* self) { } } - TableId_ = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion()); + TableId = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion()); return true; } @@ -150,17 +146,17 @@ TVector GetColumnWrites(const ::google::protobuf:: return writeColumns; } -void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx) +void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx) { TVector keyCells; - for (ui32 rowIdx = 0; rowIdx < Matrix_.GetRowCount(); ++rowIdx) + for (ui32 rowIdx = 0; rowIdx KeyColumnIds.size() - 1, keyCells); + Matrix.GetSubmatrix(rowIdx, rowIdx, 0, TableInfo->KeyColumnIds.size() - 1, keyCells); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo_->Path << ", shard: " << TabletId_ << ", " - << "write point " << DebugPrintPoint(TableInfo_->KeyColumnTypes, keyCells, typeRegistry)); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << tabletId << ", " + << "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry)); TTableRange tableRange(keyCells); - EngineBay.AddWriteRange(TableId_, tableRange, TableInfo_->KeyColumnTypes, GetColumnWrites(columnTags), false); + EngineBay.AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false); } } @@ -219,21 +215,21 @@ void TValidatedWriteTx::ComputeTxSize() { TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) : TOperation(op) - , Ev_(ev) + , Ev(ev) , ArtifactFlags(0) , TxCacheUsage(0) , ReleasedTxDataSize(0) , SchemeShardId(0) , SubDomainPathId(0) { - SetTarget(Ev_->Sender); - SetCookie(Ev_->Cookie); - Orbit = std::move(Ev_->Get()->MoveOrbit()); + SetTarget(Ev->Sender); + SetCookie(Ev->Cookie); + Orbit = std::move(Ev->Get()->MoveOrbit()); BuildWriteTx(self, txc, ctx); // First time parsing, so we can fail - Y_DEBUG_ABORT_UNLESS(WriteTx_->Ready()); + Y_DEBUG_ABORT_UNLESS(WriteTx->Ready()); TrackMemory(); } @@ -245,30 +241,30 @@ TWriteOperation::~TWriteOperation() void TWriteOperation::FillTxData(TValidatedWriteTx::TPtr writeTx) { - Y_ABORT_UNLESS(!WriteTx_); - Y_ABORT_UNLESS(!Ev_ || HasVolatilePrepareFlag()); + Y_ABORT_UNLESS(!WriteTx); + Y_ABORT_UNLESS(!Ev || HasVolatilePrepareFlag()); - Target = writeTx->Source(); - WriteTx_ = writeTx; + Target = writeTx->GetSource(); + WriteTx = writeTx; } void TWriteOperation::FillTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TActorId& target, NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TVector& locks, ui64 artifactFlags) { UntrackMemory(); - Y_ABORT_UNLESS(!WriteTx_); - Y_ABORT_UNLESS(!Ev_); + Y_ABORT_UNLESS(!WriteTx); + Y_ABORT_UNLESS(!Ev); Target = target; - Ev_ = std::move(ev); + Ev = std::move(ev); if (locks.size()) { for (auto lock : locks) LocksCache().Locks[lock.LockId] = lock; } ArtifactFlags = artifactFlags; - Y_ABORT_UNLESS(!WriteTx_); + Y_ABORT_UNLESS(!WriteTx); BuildWriteTx(self, txc, ctx); - Y_ABORT_UNLESS(WriteTx_->Ready()); + Y_ABORT_UNLESS(WriteTx->Ready()); TrackMemory(); } @@ -277,11 +273,11 @@ void TWriteOperation::FillVolatileTxData(TDataShard* self, TTransactionContext& { UntrackMemory(); - Y_ABORT_UNLESS(!WriteTx_); - Y_ABORT_UNLESS(Ev_); + Y_ABORT_UNLESS(!WriteTx); + Y_ABORT_UNLESS(Ev); BuildWriteTx(self, txc, ctx); - Y_ABORT_UNLESS(WriteTx_->Ready()); + Y_ABORT_UNLESS(WriteTx->Ready()); TrackMemory(); @@ -289,24 +285,24 @@ void TWriteOperation::FillVolatileTxData(TDataShard* self, TTransactionContext& TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) { - if (!WriteTx_) { - Y_ABORT_UNLESS(Ev_); - WriteTx_ = std::make_shared(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev_); + if (!WriteTx) { + Y_ABORT_UNLESS(Ev); + WriteTx = std::make_shared(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev); } - return WriteTx_; + return WriteTx; } void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider, const TActorContext& ctx) { ReleasedTxDataSize = provider.GetMemoryLimit() + provider.GetRequestedMemory(); - if (!WriteTx_ || WriteTx_->IsTxDataReleased()) + if (!WriteTx || IsTxDataReleased()) return; - WriteTx_->ReleaseTxData(); + WriteTx->ReleaseTxData(); // Immediate transactions have no body stored. if (!IsImmediate() && !HasVolatilePrepareFlag()) { UntrackMemory(); - Ev_.Reset(); + Ev.Reset(); TrackMemory(); } @@ -319,7 +315,7 @@ void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data"); } -void TWriteOperation::DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) +void TWriteOperation::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx) { using Schema = TDataShard::Schema; @@ -337,25 +333,25 @@ void TWriteOperation::DbStoreLocksAccessLog(TDataShard* self, TTransactionContex TStringBuf vecData(vecDataStart, vecDataSize); db.Table().Key(GetTxId()).Update(NIceDb::TUpdate(vecData)); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << self->TabletID()); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << tabletId); } -void TWriteOperation::DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) +void TWriteOperation::DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx) { using Schema = TDataShard::Schema; NIceDb::TNiceDb db(txc.DB); db.Table().Key(GetTxId()).Update(ArtifactFlags); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << self->TabletID()); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << tabletId); } ui64 TWriteOperation::GetMemoryConsumption() const { ui64 res = 0; - if (WriteTx_) { - res += WriteTx_->GetTxSize() + WriteTx_->GetMemoryAllocated(); + if (WriteTx) { + res += WriteTx->GetTxSize() + WriteTx->GetMemoryAllocated(); } - if (Ev_) { + if (Ev) { res += sizeof(NEvents::TDataEvents::TEvWrite); } @@ -374,7 +370,7 @@ ERestoreDataStatus TWriteOperation::RestoreTxData( Y_UNUSED(ctx); Y_ABORT(); /* - if (!WriteTx_) { + if (!WriteTx) { ReleasedTxDataSize = 0; return ERestoreDataStatus::Ok; } @@ -382,20 +378,20 @@ ERestoreDataStatus TWriteOperation::RestoreTxData( UntrackMemory(); // For immediate transactions we should restore just - // from the Ev_. For planned transaction we should + // from the Ev. For planned transaction we should // restore from local database. TVector locks; if (!IsImmediate() && !HasVolatilePrepareFlag()) { NIceDb::TNiceDb db(txc.DB);ExtractKeys - bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, Ev_, locks, ArtifactFlags); + bool ok = self->TransQueue.LoadTxDetails(db, GetTxId(), Target, Ev, locks, ArtifactFlags); if (!ok) { - Ev_.Reset(); + Ev.Reset(); ArtifactFlags = 0; return ERestoreDataStatus::Restart; } } else { - Y_ABORT_UNLESS(Ev_); + Y_ABORT_UNLESS(Ev); } TrackMemory(); @@ -403,13 +399,13 @@ ERestoreDataStatus TWriteOperation::RestoreTxData( for (auto& lock : locks) LocksCache().Locks[lock.LockId] = lock; - bool extractKeys = WriteTx_->IsTxInfoLoaded(); - WriteTx_ = std::make_shared(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev_); - if (WriteTx_->Ready() && extractKeys) { - WriteTx_->ExtractKeys(true); + bool extractKeys = WriteTx->IsTxInfoLoaded(); + WriteTx = std::make_shared(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev); + if (WriteTx->Ready() && extractKeys) { + WriteTx->ExtractKeys(true); } - if (!WriteTx_->Ready()) { + if (!WriteTx->Ready()) { return ERestoreDataStatus::Error; } @@ -484,20 +480,20 @@ THolder CreateFinalizeWriteTxPlanUnit(TDataShard& dataShard, TPi void TWriteOperation::TrackMemory() const { // TODO More accurate calc memory - NActors::NMemory::TLabel::Add(Record().SpaceUsed()); + NActors::NMemory::TLabel::Add(GetRecord().SpaceUsed()); } void TWriteOperation::UntrackMemory() const { - NActors::NMemory::TLabel::Sub(Record().SpaceUsed()); + NActors::NMemory::TLabel::Sub(GetRecord().SpaceUsed()); } -void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) { +void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId) { SetAbortedFlag(); - WriteResult_ = NEvents::TDataEvents::TEvWriteResult::BuildError(WriteTx_->TabletId(), GetTxId(), status, errorMsg); + WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(tabletId, GetTxId(), status, errorMsg); } void TWriteOperation::SetWriteResult(std::unique_ptr&& writeResult) { - WriteResult_ = std::move(writeResult); + WriteResult = std::move(writeResult); } void TWriteOperation::BuildExecutionPlan(bool loaded) diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index 1328bc6eeb0b..0e3316aa6c4f 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -33,44 +33,27 @@ class TValidatedWriteTx: TNonCopyable { return ErrStr; } - TStepOrder StepTxId() const { - return StepTxId_; - } - ui64 TxId() const { - return StepTxId_.TxId; - } - ui64 TabletId() const { - return TabletId_; - } - const NEvents::TDataEvents::TEvWrite::TPtr& Ev() const { - return Ev_; + const NEvents::TDataEvents::TEvWrite::TPtr& GetEv() const { + return Ev; } - const NKikimrDataEvents::TEvWrite& Record() const { - return Ev_->Get()->Record; + const NKikimrDataEvents::TEvWrite& GetRecord() const { + return Ev->Get()->Record; } const NKikimrDataEvents::TEvWrite::TOperation& RecordOperation() const { - //TODO Only one operation is supported now - return Record().operations(0); - } - - const TTableId& TableId() const { - return TableId_; - } - - const TSerializedCellMatrix Matrix() const { - return Matrix_; + Y_ABORT_UNLESS(GetRecord().operations().size() == 1, "Only one operation is supported now"); + return GetRecord().operations(0); } ui64 LockTxId() const { - return Record().locktxid(); + return GetRecord().locktxid(); } ui32 LockNodeId() const { - return Record().locknodeid(); + return GetRecord().locknodeid(); } bool Immediate() const { - return Record().txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE; + return GetRecord().txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE; } bool NeedDiagnostics() const { return true; @@ -78,13 +61,6 @@ class TValidatedWriteTx: TNonCopyable { bool CollectStats() const { return true; } - TInstant ReceivedAt() const { - return ReceivedAt_; - } - TInstant Deadline() const { - return Deadline_; - } - bool Ready() const { return ErrCode == NKikimrTxDataShard::TError::OK; } @@ -163,47 +139,25 @@ class TValidatedWriteTx: TNonCopyable { return EngineBay.GetVolatileCommitOrdered(); } - TActorId Source() const { - return Source_; - } - void SetSource(const TActorId& actorId) { - Source_ = actorId; - } - void SetStep(ui64 step) { - StepTxId_.Step = step; - } bool IsProposed() const { - return Source_ != TActorId(); + return Source != TActorId(); } inline const ::NKikimrDataEvents::TKqpLocks& GetKqpLocks() const { - return Record().locks(); + return GetRecord().locks(); } - bool ParseRecord(TDataShard* self); - void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx); + bool ParseRecord(const TDataShard::TTableInfos& tableInfos); + void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds, const NScheme::TTypeRegistry& typeRegistry, ui64 tabletId, const TActorContext& ctx); ui32 ExtractKeys(bool allowErrors); bool ReValidateKeys(); - ui64 GetTxSize() const { - return TxSize; - } ui32 KeysCount() const { return TxInfo().WritesCount; } - void SetTxCacheUsage(ui64 val) { - TxCacheUsage = val; - } - ui64 GetTxCacheUsage() const { - return TxCacheUsage; - } - void ReleaseTxData(); - bool IsTxDataReleased() const { - return IsReleased; - } bool IsTxInfoLoaded() const { return TxInfo().Loaded; @@ -221,23 +175,24 @@ class TValidatedWriteTx: TNonCopyable { } private: - //TODO: YDB_READONLY - TStepOrder StepTxId_; - ui64 TabletId_; - TTableId TableId_; - const TUserTable* TableInfo_; - const NEvents::TDataEvents::TEvWrite::TPtr& Ev_; - TSerializedCellMatrix Matrix_; - TActorId Source_; + const NEvents::TDataEvents::TEvWrite::TPtr& Ev; TEngineBay EngineBay; - NKikimrTxDataShard::TError::EKind ErrCode; - TString ErrStr; - ui64 TxSize; - ui64 TxCacheUsage; - bool IsReleased; - const TInstant ReceivedAt_; // For local timeout tracking - TInstant Deadline_; + YDB_ACCESSOR_DEF(TActorId, Source); + + YDB_READONLY(TStepOrder, StepTxId, TStepOrder(0, 0)); + YDB_READONLY_DEF(TTableId, TableId); + YDB_READONLY_DEF(TSerializedCellMatrix, Matrix); + YDB_READONLY_DEF(TInstant, ReceivedAt); + + YDB_READONLY_DEF(ui64, TxSize); + + YDB_READONLY_DEF(NKikimrTxDataShard::TError::EKind, ErrCode); + YDB_READONLY_DEF(TString, ErrStr); + YDB_READONLY_DEF(bool, IsReleased); + + const TUserTable* TableInfo; +private: void ComputeTxSize(); }; @@ -253,40 +208,19 @@ class TWriteOperation : public TOperation { void FillVolatileTxData(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); const NEvents::TDataEvents::TEvWrite::TPtr& GetEv() const { - return Ev_; + return Ev; } void SetEv(const NEvents::TDataEvents::TEvWrite::TPtr& ev) { UntrackMemory(); - Ev_ = ev; + Ev = ev; TrackMemory(); } void ClearEv() { UntrackMemory(); - Ev_.Reset(); + Ev.Reset(); TrackMemory(); } - ui64 GetSchemeShardId() const { - return SchemeShardId; - } - void SetSchemeShardId(ui64 id) { - SchemeShardId = id; - } - ui64 GetSubDomainPathId() const { - return SubDomainPathId; - } - void SetSubDomainPathId(ui64 pathId) { - SubDomainPathId = pathId; - } - - const NKikimrSubDomains::TProcessingParams& GetProcessingParams() const { - return ProcessingParams; - } - void SetProcessingParams(const NKikimrSubDomains::TProcessingParams& params) - { - ProcessingParams.CopyFrom(params); - } - void Deactivate() override { ClearEv(); @@ -294,27 +228,17 @@ class TWriteOperation : public TOperation { } ui32 ExtractKeys() { - return WriteTx_ ? WriteTx_->ExtractKeys(false) : 0; + return WriteTx ? WriteTx->ExtractKeys(false) : 0; } bool ReValidateKeys() { - return WriteTx_ ? WriteTx_->ReValidateKeys() : true; + return WriteTx ? WriteTx->ReValidateKeys() : true; } void MarkAsUsingSnapshot() { SetUsingSnapshotFlag(); } - void SetTxCacheUsage(ui64 val) { - TxCacheUsage = val; - } - ui64 GetTxCacheUsage() const { - return TxCacheUsage; - } - - ui64 GetReleasedTxDataSize() const { - return ReleasedTxDataSize; - } bool IsTxDataReleased() const { return ReleasedTxDataSize > 0; } @@ -339,8 +263,8 @@ class TWriteOperation : public TOperation { return ArtifactFlags & LOCKS_STORED; } - void DbStoreLocksAccessLog(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); - void DbStoreArtifactFlags(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); + void DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx); + void DbStoreArtifactFlags(ui64 tabletId, TTransactionContext& txc, const TActorContext& ctx); ui64 GetMemoryConsumption() const; @@ -360,13 +284,13 @@ class TWriteOperation : public TOperation { void BuildExecutionPlan(bool loaded) override; bool HasKeysInfo() const override { - return WriteTx_ ? WriteTx_->TxInfo().Loaded : false; + return WriteTx ? WriteTx->TxInfo().Loaded : false; } const NMiniKQL::IEngineFlat::TValidationInfo& GetKeysInfo() const override { - if (WriteTx_) { - Y_ABORT_UNLESS(WriteTx_->TxInfo().Loaded); - return WriteTx_->TxInfo(); + if (WriteTx) { + Y_ABORT_UNLESS(WriteTx->TxInfo().Loaded); + return WriteTx->TxInfo(); } // For scheme tx global reader and writer flags should // result in all required dependencies. @@ -374,42 +298,42 @@ class TWriteOperation : public TOperation { } ui64 LockTxId() const override { - return WriteTx_ ? WriteTx_->LockTxId() : 0; + return WriteTx ? WriteTx->LockTxId() : 0; } ui32 LockNodeId() const override { - return WriteTx_ ? WriteTx_->LockNodeId() : 0; + return WriteTx ? WriteTx->LockNodeId() : 0; } bool HasLockedWrites() const override { - return WriteTx_ ? WriteTx_->HasLockedWrites() : false; + return WriteTx ? WriteTx->HasLockedWrites() : false; } ui64 IncrementPageFaultCount() { return ++PageFaultCount; } - const TValidatedWriteTx::TPtr& WriteTx() const { - return WriteTx_; + const TValidatedWriteTx::TPtr& GetWriteTx() const { + return WriteTx; } TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); void ClearWriteTx() { - WriteTx_ = nullptr; + WriteTx = nullptr; } - const NKikimrDataEvents::TEvWrite& Record() const { - return Ev_->Get()->Record; + const NKikimrDataEvents::TEvWrite& GetRecord() const { + return Ev->Get()->Record; } - const std::unique_ptr& WriteResult() const { - return WriteResult_; + const std::unique_ptr& GetWriteResult() const { + return WriteResult; } - std::unique_ptr&& WriteResult() { - return std::move(WriteResult_); + std::unique_ptr&& ReleaseWriteResult() { + return std::move(WriteResult); } - void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg); + void SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg, ui64 tabletId); void SetWriteResult(std::unique_ptr&& writeResult); private: @@ -417,17 +341,17 @@ class TWriteOperation : public TOperation { void UntrackMemory() const; private: - NEvents::TDataEvents::TEvWrite::TPtr Ev_; - TValidatedWriteTx::TPtr WriteTx_; - std::unique_ptr WriteResult_; - - // TODO: move to persistent part of operation's flags - ui64 ArtifactFlags; - ui64 TxCacheUsage; - ui64 ReleasedTxDataSize; - ui64 SchemeShardId; - ui64 SubDomainPathId; - NKikimrSubDomains::TProcessingParams ProcessingParams; + NEvents::TDataEvents::TEvWrite::TPtr Ev; + TValidatedWriteTx::TPtr WriteTx; + std::unique_ptr WriteResult; + + YDB_READONLY_DEF(ui64, ArtifactFlags); + YDB_ACCESSOR_DEF(ui64, TxCacheUsage); + YDB_ACCESSOR_DEF(ui64, ReleasedTxDataSize); + YDB_ACCESSOR_DEF(ui64, SchemeShardId); + YDB_ACCESSOR_DEF(ui64, SubDomainPathId); + YDB_ACCESSOR_DEF(NKikimrSubDomains::TProcessingParams, ProcessingParams); + ui64 PageFaultCount = 0; }; diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp index e6d6cef31559..147f3e8d7200 100644 --- a/ydb/core/tx/datashard/finish_propose_write_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp @@ -76,7 +76,7 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op, const TActorContext &ctx) { TWriteOperation* writeOp = CastWriteOperation(op); - if (writeOp->WriteResult()) + if (writeOp->GetWriteResult()) UpdateCounters(writeOp, ctx); bool hadWrites = false; @@ -137,7 +137,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext if (!op->HasResultSentFlag()) { DataShard.IncCounter(COUNTER_PREPARE_COMPLETE); - if (writeOp->WriteResult()) + if (writeOp->GetWriteResult()) CompleteRequest(op, ctx); } @@ -156,7 +156,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext void TFinishProposeWriteUnit::CompleteRequest(TOperation::TPtr op, const TActorContext &ctx) { TWriteOperation* writeOp = CastWriteOperation(op); - auto res = writeOp->WriteResult(); + auto res = writeOp->ReleaseWriteResult(); TDuration duration = TAppData::TimeProvider->Now() - op->GetReceivedAt(); @@ -208,7 +208,7 @@ void TFinishProposeWriteUnit::AddDiagnosticsResult(NEvents::TDataEvents::TEvWrit void TFinishProposeWriteUnit::UpdateCounters(const TWriteOperation* writeOp, const TActorContext& ctx) { - const auto& res = writeOp->WriteResult(); + const auto& res = writeOp->GetWriteResult(); auto execLatency = TAppData::TimeProvider->Now() - writeOp->GetReceivedAt(); DataShard.IncCounter(COUNTER_PREPARE_EXEC_LATENCY, execLatency); if (res->IsPrepared()) { diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index f088a52420ce..3b76d38aabfc 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -36,13 +36,13 @@ class TWriteUnit : public TExecutionUnit { } void DoExecute(TDataShard* self, TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) { - const TValidatedWriteTx::TPtr& writeTx = writeOp->WriteTx(); + const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx(); - const ui64 tableId = writeTx->TableId().PathId.LocalPathId; + const ui64 tableId = writeTx->GetTableId().PathId.LocalPathId; const TTableId fullTableId(self->GetPathOwnerId(), tableId); const ui64 localTableId = self->GetLocalTableId(fullTableId); if (localTableId == 0) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId, self->TabletID()); return; } const ui64 shadowTableId = self->GetShadowTableId(fullTableId); @@ -62,7 +62,7 @@ class TWriteUnit : public TExecutionUnit { TVector keyCells; - const TSerializedCellMatrix& matrix = writeTx->Matrix(); + const TSerializedCellMatrix& matrix = writeTx->GetMatrix(); for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) { @@ -73,7 +73,7 @@ class TWriteUnit : public TExecutionUnit { const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx]; const TCell& cell = matrix.GetCell(rowIdx, keyColIdx); if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue() > 127) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited"); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited", self->TabletID()); return; } @@ -83,7 +83,7 @@ class TWriteUnit : public TExecutionUnit { } if (keyBytes > NLimits::MaxWriteKeySize) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize, self->TabletID()); return; } @@ -92,7 +92,7 @@ class TWriteUnit : public TExecutionUnit { ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx); const TCell& cell = matrix.GetCell(rowIdx, valueColIdx); if (cell.Size() > NLimits::MaxWriteValueSize) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize, self->TabletID()); return; } @@ -111,16 +111,16 @@ class TWriteUnit : public TExecutionUnit { TableInfo_.Stats.UpdateTime = TAppData::TimeProvider->Now(); - writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(writeTx->TabletId(), writeOp->GetTxId())); + writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(self->TabletID(), writeOp->GetTxId())); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << writeOp->WriteTx()->TabletId()); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << self->TabletID()); } EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { TWriteOperation* writeOp = dynamic_cast(op.Get()); Y_ABORT_UNLESS(writeOp != nullptr); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executing write operation for " << *op << " at " << writeOp->WriteTx()->TabletId()); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executing write operation for " << *op << " at " << DataShard.TabletID()); if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && CheckRejectDataTx(op, ctx)) { return EExecutionStatus::Executed; @@ -136,7 +136,7 @@ class TWriteUnit : public TExecutionUnit { } else { //TODO: Prepared - writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->WriteTx()->TabletId(), op->GetTxId(), {0, 0, {}})); + writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(DataShard.TabletID(), op->GetTxId(), {0, 0, {}})); return EExecutionStatus::DelayCompleteNoMoreRestarts; } @@ -169,7 +169,7 @@ class TWriteUnit : public TExecutionUnit { return EExecutionStatus::Continue; } - op->ChangeRecords() = std::move(writeOp->WriteTx()->GetCollectedChanges()); + op->ChangeRecords() = std::move(writeOp->GetWriteTx()->GetCollectedChanges()); DataShard.SysLocksTable().ApplyLocks(); DataShard.SubscribeNewLocks(ctx); @@ -186,8 +186,8 @@ class TWriteUnit : public TExecutionUnit { TWriteOperation* writeOp = dynamic_cast(op.Get()); Y_ABORT_UNLESS(writeOp != nullptr); - const auto& status = writeOp->WriteResult()->Record.status(); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Completed write operation for " << *op << " at " << writeOp->WriteTx()->TabletId() << ", status " << status); + const auto& status = writeOp->GetWriteResult()->Record.status(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Completed write operation for " << *op << " at " << DataShard.TabletID() << ", status " << status); //TODO: Counters // if (WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED || WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) { @@ -196,7 +196,7 @@ class TWriteUnit : public TExecutionUnit { // self->IncCounter(COUNTER_WRITE_ERROR); // } - ctx.Send(writeOp->GetEv()->Sender, writeOp->WriteResult().release(), 0, writeOp->GetEv()->Cookie); + ctx.Send(writeOp->GetEv()->Sender, writeOp->ReleaseWriteResult().release(), 0, writeOp->GetEv()->Cookie); } }; // TWriteUnit