From 982e9c9871a05e6b73b88d20ce66b3502420f395 Mon Sep 17 00:00:00 2001 From: azevaykin Date: Thu, 28 Dec 2023 09:00:03 +0000 Subject: [PATCH] Move size checks to the parse phase --- ydb/core/tx/datashard/check_write_unit.cpp | 36 ----------------- ydb/core/tx/datashard/datashard__write.cpp | 5 ++- ydb/core/tx/datashard/datashard_pipeline.cpp | 6 +-- ydb/core/tx/datashard/datashard_ut_write.cpp | 2 +- .../datashard/datashard_write_operation.cpp | 40 +++++++++++++++++-- .../tx/datashard/datashard_write_operation.h | 13 +++--- .../datashard/finish_propose_write_unit.cpp | 15 ++----- ydb/core/tx/datashard/write_unit.cpp | 18 ++------- 8 files changed, 57 insertions(+), 78 deletions(-) diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp index b02c3f0e52fd..d91a3cbdae22 100644 --- a/ydb/core/tx/datashard/check_write_unit.cpp +++ b/ydb/core/tx/datashard/check_write_unit.cpp @@ -77,42 +77,6 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, { for (const auto& key : writeTx->TxInfo().Keys) { if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) { - ui64 keySize = 0; - for (const auto& cell : key.Key->Range.From) { - keySize += cell.Size(); - } - if (keySize > NLimits::MaxWriteKeySize) { - TString err = TStringBuilder() - << "Operation " << *op << " writes key of " << keySize - << " bytes which exceeds limit " << NLimits::MaxWriteKeySize - << " bytes at " << DataShard.TabletID(); - - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID()); - op->Abort(EExecutionUnitKind::FinishProposeWrite); - - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); - - return EExecutionStatus::Executed; - } - for (const auto& col : key.Key->Columns) { - if (col.Operation == TKeyDesc::EColumnOperation::Set || - col.Operation == TKeyDesc::EColumnOperation::InplaceUpdate) - { - if (col.ImmediateUpdateSize > NLimits::MaxWriteValueSize) { - TString err = TStringBuilder() - << "Transaction write column value of " << col.ImmediateUpdateSize - << " bytes is larger than the allowed threshold"; - - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID()); - op->Abort(EExecutionUnitKind::FinishProposeWrite); - - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); - - return EExecutionStatus::Executed; - } - } - } - if (DataShard.IsSubDomainOutOfSpace()) { switch (key.Key->RowOperation) { case TKeyDesc::ERowOperation::Read: diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp index 581ee461bdb5..87f6b9a2d126 100644 --- a/ydb/core/tx/datashard/datashard__write.cpp +++ b/ydb/core/tx/datashard/datashard__write.cpp @@ -72,16 +72,17 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext } TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId()); + TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op); // Unsuccessful operation parse. if (op->IsAborted()) { LWTRACK(ProposeTransactionParsed, op->Orbit, false); - Y_ABORT_UNLESS(op->Result()); + Y_ABORT_UNLESS(writeOp->GetWriteResult()); if (ProposeTransactionSpan) { ProposeTransactionSpan.EndError("TTxWrite:: unsuccessful operation parse"); } - ctx.Send(op->GetTarget(), op->Result().Release()); + ctx.Send(op->GetTarget(), writeOp->ReleaseWriteResult().release()); return true; } LWTRACK(ProposeTransactionParsed, op->Orbit, true); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 4457cac04e51..9e95d424f71a 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1571,12 +1571,12 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& writeOp->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END); auto badRequest = [&](const TString& error) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error, Self->TabletID()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << "at tablet# " << Self->TabletID(), Self->TabletID()); LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error); }; if (!writeTx->Ready()) { - badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << writeOp->GetTxId() << ": " << writeOp->GetWriteTx()->GetError()); + badRequest(TStringBuilder() << "Cannot parse tx " << writeOp->GetTxId() << ". " << writeOp->GetWriteTx()->GetErrCode() << ": " << writeOp->GetWriteTx()->GetErrStr()); return writeOp; } @@ -1599,7 +1599,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr& // Make config checks for immediate op. if (writeOp->IsImmediate()) { if (Config.NoImmediate() || (Config.ForceOnlineRW())) { - LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Shard " << Self->TabletID() << " force immediate writeOp " << writeOp->GetTxId() << " to online according to config"); + LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Force immediate writeOp " << writeOp->GetTxId() << " to online according to config, at tablet #" << Self->TabletID()); writeOp->SetForceOnlineFlag(); } else { if (Config.DirtyImmediate()) diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp index a209980ad837..3ff5636e1f99 100644 --- a/ydb/core/tx/datashard/datashard_ut_write.cpp +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST); UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1); - UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes")); + UNIT_ASSERT(record.GetIssues(0).message().Contains("Row key size of 1049601 bytes is larger than the allowed threshold 1049600")); } Y_UNIT_TEST(WriteOnShard) { diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index 2cd3af217566..0ed98c2363f8 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -129,6 +129,36 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) { } } + for (ui32 rowIdx = 0; rowIdx < Matrix.GetRowCount(); ++rowIdx) + { + ui64 keyBytes = 0; + for (ui16 keyColIdx = 0; keyColIdx < TableInfo->KeyColumnIds.size(); ++keyColIdx) { + const auto& cellType = TableInfo->KeyColumnTypes[keyColIdx]; + const TCell& cell = Matrix.GetCell(rowIdx, keyColIdx); + if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue() > 127) { + ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; + ErrStr = TStringBuilder() << "Keys with Uint8 column values >127 are currently prohibited"; + return false; + } + keyBytes += cell.IsNull() ? 1 : cell.Size(); + } + + if (keyBytes > NLimits::MaxWriteKeySize) { + ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; + ErrStr = TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize; + return false; + } + + for (ui16 valueColIdx = TableInfo->KeyColumnIds.size(); valueColIdx < Matrix.GetColCount(); ++valueColIdx) { + const TCell& cell = Matrix.GetCell(rowIdx, valueColIdx); + if (cell.Size() > NLimits::MaxWriteValueSize) { + ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; + ErrStr = TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize; + return false; + } + } + } + TableId = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion()); return true; } @@ -213,6 +243,13 @@ void TValidatedWriteTx::ComputeTxSize() { TxSize = sizeof(TValidatedWriteTx); } +TWriteOperation* TWriteOperation::CastWriteOperation(TOperation::TPtr op) +{ + TWriteOperation* writeOp = dynamic_cast(op.Get()); + Y_ABORT_UNLESS(writeOp); + return writeOp; +} + TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx) : TOperation(op) , Ev(ev) @@ -228,9 +265,6 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::T BuildWriteTx(self, txc, ctx); - // First time parsing, so we can fail - Y_DEBUG_ABORT_UNLESS(WriteTx->Ready()); - TrackMemory(); } diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index 0e3316aa6c4f..4737c3ada7a1 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -26,13 +26,6 @@ class TValidatedWriteTx: TNonCopyable { return 100; } - NKikimrTxDataShard::TError::EKind Code() const { - return ErrCode; - } - const TString GetError() const { - return ErrStr; - } - const NEvents::TDataEvents::TEvWrite::TPtr& GetEv() const { return Ev; } @@ -43,6 +36,7 @@ class TValidatedWriteTx: TNonCopyable { const NKikimrDataEvents::TEvWrite::TOperation& RecordOperation() const { Y_ABORT_UNLESS(GetRecord().operations().size() == 1, "Only one operation is supported now"); + Y_ABORT_UNLESS(GetRecord().operations(0).GetType() == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, "Only UPSERT operation is supported now"); return GetRecord().operations(0); } @@ -199,6 +193,8 @@ class TValidatedWriteTx: TNonCopyable { class TWriteOperation : public TOperation { friend class TWriteUnit; public: + static TWriteOperation* CastWriteOperation(TOperation::TPtr op); + explicit TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); ~TWriteOperation(); @@ -316,6 +312,9 @@ class TWriteOperation : public TOperation { const TValidatedWriteTx::TPtr& GetWriteTx() const { return WriteTx; } + TValidatedWriteTx::TPtr& GetWriteTx() { + return WriteTx; + } TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx); void ClearWriteTx() { diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp index 147f3e8d7200..a37693690a15 100644 --- a/ydb/core/tx/datashard/finish_propose_write_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp @@ -25,7 +25,7 @@ class TFinishProposeWriteUnit : public TExecutionUnit { void AddDiagnosticsResult(NEvents::TDataEvents::TEvWriteResult& res); void UpdateCounters(const TWriteOperation* writeOp, const TActorContext& ctx); - static TWriteOperation* CastWriteOperation(TOperation::TPtr op); + }; TFinishProposeWriteUnit::TFinishProposeWriteUnit(TDataShard &dataShard, @@ -38,13 +38,6 @@ TFinishProposeWriteUnit::~TFinishProposeWriteUnit() { } -TWriteOperation* TFinishProposeWriteUnit::CastWriteOperation(TOperation::TPtr op) -{ - TWriteOperation* writeOp = dynamic_cast(op.Get()); - Y_ABORT_UNLESS(writeOp); - return writeOp; -} - bool TFinishProposeWriteUnit::IsReadyToExecute(TOperation::TPtr) const { return true; @@ -75,7 +68,7 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op, TTransactionContext &txc, const TActorContext &ctx) { - TWriteOperation* writeOp = CastWriteOperation(op); + TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op); if (writeOp->GetWriteResult()) UpdateCounters(writeOp, ctx); @@ -132,7 +125,7 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op, void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx) { - TWriteOperation* writeOp = CastWriteOperation(op); + TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op); if (!op->HasResultSentFlag()) { DataShard.IncCounter(COUNTER_PREPARE_COMPLETE); @@ -155,7 +148,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext void TFinishProposeWriteUnit::CompleteRequest(TOperation::TPtr op, const TActorContext &ctx) { - TWriteOperation* writeOp = CastWriteOperation(op); + TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op); auto res = writeOp->ReleaseWriteResult(); TDuration duration = TAppData::TimeProvider->Now() - op->GetReceivedAt(); diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index 3b76d38aabfc..641a23124de6 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -53,6 +53,8 @@ class TWriteUnit : public TExecutionUnit { const ui32 writeTableId = localTableId; auto [readVersion, writeVersion] = self->GetReadWriteVersions(writeOp); + writeTx->SetReadVersion(readVersion); + writeTx->SetWriteVersion(writeVersion); TDataShardUserDb userDb(*self, txc.DB, readVersion); TDataShardChangeGroupProvider groupProvider(*self, txc.DB); @@ -68,34 +70,20 @@ class TWriteUnit : public TExecutionUnit { { key.clear(); keyCells.clear(); + keyCells.reserve(TableInfo_.KeyColumnIds.size()); ui64 keyBytes = 0; for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) { const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx]; const TCell& cell = matrix.GetCell(rowIdx, keyColIdx); - if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue() > 127) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited", self->TabletID()); - return; - } - keyBytes += cell.Size(); key.emplace_back(TRawTypeValue(cell.AsRef(), cellType)); keyCells.emplace_back(cell); } - if (keyBytes > NLimits::MaxWriteKeySize) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize, self->TabletID()); - return; - } - value.clear(); for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) { ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx); const TCell& cell = matrix.GetCell(rowIdx, valueColIdx); - if (cell.Size() > NLimits::MaxWriteValueSize) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize, self->TabletID()); - return; - } - auto* col = TableInfo_.Columns.FindPtr(valueColIdx + 1); Y_ABORT_UNLESS(col);