Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions ydb/core/tx/datashard/check_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,42 +77,6 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
{
for (const auto& key : writeTx->TxInfo().Keys) {
if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) {
ui64 keySize = 0;
for (const auto& cell : key.Key->Range.From) {
keySize += cell.Size();
}
if (keySize > NLimits::MaxWriteKeySize) {
TString err = TStringBuilder()
<< "Operation " << *op << " writes key of " << keySize
<< " bytes which exceeds limit " << NLimits::MaxWriteKeySize
<< " bytes at " << DataShard.TabletID();

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID());
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}
for (const auto& col : key.Key->Columns) {
if (col.Operation == TKeyDesc::EColumnOperation::Set ||
col.Operation == TKeyDesc::EColumnOperation::InplaceUpdate)
{
if (col.ImmediateUpdateSize > NLimits::MaxWriteValueSize) {
TString err = TStringBuilder()
<< "Transaction write column value of " << col.ImmediateUpdateSize
<< " bytes is larger than the allowed threshold";

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID());
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}
}
}

if (DataShard.IsSubDomainOutOfSpace()) {
switch (key.Key->RowOperation) {
case TKeyDesc::ERowOperation::Read:
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/datashard/datashard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
}

TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId());
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);

// Unsuccessful operation parse.
if (op->IsAborted()) {
LWTRACK(ProposeTransactionParsed, op->Orbit, false);
Y_ABORT_UNLESS(op->Result());
Y_ABORT_UNLESS(writeOp->GetWriteResult());

if (ProposeTransactionSpan) {
ProposeTransactionSpan.EndError("TTxWrite:: unsuccessful operation parse");
}
ctx.Send(op->GetTarget(), op->Result().Release());
ctx.Send(op->GetTarget(), writeOp->ReleaseWriteResult().release());
return true;
}
LWTRACK(ProposeTransactionParsed, op->Orbit, true);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1571,12 +1571,12 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
writeOp->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END);

auto badRequest = [&](const TString& error) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error, Self->TabletID());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << "at tablet# " << Self->TabletID(), Self->TabletID());
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
};

if (!writeTx->Ready()) {
badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << writeOp->GetTxId() << ": " << writeOp->GetWriteTx()->GetError());
badRequest(TStringBuilder() << "Cannot parse tx " << writeOp->GetTxId() << ". " << writeOp->GetWriteTx()->GetErrCode() << ": " << writeOp->GetWriteTx()->GetErrStr());
return writeOp;
}

Expand All @@ -1599,7 +1599,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
// Make config checks for immediate op.
if (writeOp->IsImmediate()) {
if (Config.NoImmediate() || (Config.ForceOnlineRW())) {
LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Shard " << Self->TabletID() << " force immediate writeOp " << writeOp->GetTxId() << " to online according to config");
LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Force immediate writeOp " << writeOp->GetTxId() << " to online according to config, at tablet #" << Self->TabletID());
writeOp->SetForceOnlineFlag();
} else {
if (Config.DirtyImmediate())
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);

UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1);
UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes"));
UNIT_ASSERT(record.GetIssues(0).message().Contains("Row key size of 1049601 bytes is larger than the allowed threshold 1049600"));
}

Y_UNIT_TEST(WriteOnShard) {
Expand Down
40 changes: 37 additions & 3 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,36 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) {
}
}

for (ui32 rowIdx = 0; rowIdx < Matrix.GetRowCount(); ++rowIdx)
{
ui64 keyBytes = 0;
for (ui16 keyColIdx = 0; keyColIdx < TableInfo->KeyColumnIds.size(); ++keyColIdx) {
const auto& cellType = TableInfo->KeyColumnTypes[keyColIdx];
const TCell& cell = Matrix.GetCell(rowIdx, keyColIdx);
if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) {
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
ErrStr = TStringBuilder() << "Keys with Uint8 column values >127 are currently prohibited";
return false;
}
keyBytes += cell.IsNull() ? 1 : cell.Size();
}

if (keyBytes > NLimits::MaxWriteKeySize) {
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
ErrStr = TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize;
return false;
}

for (ui16 valueColIdx = TableInfo->KeyColumnIds.size(); valueColIdx < Matrix.GetColCount(); ++valueColIdx) {
const TCell& cell = Matrix.GetCell(rowIdx, valueColIdx);
if (cell.Size() > NLimits::MaxWriteValueSize) {
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
ErrStr = TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize;
return false;
}
}
}

TableId = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion());
return true;
}
Expand Down Expand Up @@ -213,6 +243,13 @@ void TValidatedWriteTx::ComputeTxSize() {
TxSize = sizeof(TValidatedWriteTx);
}

TWriteOperation* TWriteOperation::CastWriteOperation(TOperation::TPtr op)
{
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
Y_ABORT_UNLESS(writeOp);
return writeOp;
}

TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
: TOperation(op)
, Ev(ev)
Expand All @@ -228,9 +265,6 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::T

BuildWriteTx(self, txc, ctx);

// First time parsing, so we can fail
Y_DEBUG_ABORT_UNLESS(WriteTx->Ready());

TrackMemory();
}

Expand Down
13 changes: 6 additions & 7 deletions ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ class TValidatedWriteTx: TNonCopyable {
return 100;
}

NKikimrTxDataShard::TError::EKind Code() const {
return ErrCode;
}
const TString GetError() const {
return ErrStr;
}

const NEvents::TDataEvents::TEvWrite::TPtr& GetEv() const {
return Ev;
}
Expand All @@ -43,6 +36,7 @@ class TValidatedWriteTx: TNonCopyable {

const NKikimrDataEvents::TEvWrite::TOperation& RecordOperation() const {
Y_ABORT_UNLESS(GetRecord().operations().size() == 1, "Only one operation is supported now");
Y_ABORT_UNLESS(GetRecord().operations(0).GetType() == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, "Only UPSERT operation is supported now");
return GetRecord().operations(0);
}

Expand Down Expand Up @@ -199,6 +193,8 @@ class TValidatedWriteTx: TNonCopyable {
class TWriteOperation : public TOperation {
friend class TWriteUnit;
public:
static TWriteOperation* CastWriteOperation(TOperation::TPtr op);

explicit TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);

~TWriteOperation();
Expand Down Expand Up @@ -316,6 +312,9 @@ class TWriteOperation : public TOperation {
const TValidatedWriteTx::TPtr& GetWriteTx() const {
return WriteTx;
}
TValidatedWriteTx::TPtr& GetWriteTx() {
return WriteTx;
}
TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);

void ClearWriteTx() {
Expand Down
15 changes: 4 additions & 11 deletions ydb/core/tx/datashard/finish_propose_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TFinishProposeWriteUnit : public TExecutionUnit {
void AddDiagnosticsResult(NEvents::TDataEvents::TEvWriteResult& res);
void UpdateCounters(const TWriteOperation* writeOp, const TActorContext& ctx);

static TWriteOperation* CastWriteOperation(TOperation::TPtr op);

};

TFinishProposeWriteUnit::TFinishProposeWriteUnit(TDataShard &dataShard,
Expand All @@ -38,13 +38,6 @@ TFinishProposeWriteUnit::~TFinishProposeWriteUnit()
{
}

TWriteOperation* TFinishProposeWriteUnit::CastWriteOperation(TOperation::TPtr op)
{
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
Y_ABORT_UNLESS(writeOp);
return writeOp;
}

bool TFinishProposeWriteUnit::IsReadyToExecute(TOperation::TPtr) const
{
return true;
Expand Down Expand Up @@ -75,7 +68,7 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op,
TTransactionContext &txc,
const TActorContext &ctx)
{
TWriteOperation* writeOp = CastWriteOperation(op);
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
if (writeOp->GetWriteResult())
UpdateCounters(writeOp, ctx);

Expand Down Expand Up @@ -132,7 +125,7 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op,

void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx)
{
TWriteOperation* writeOp = CastWriteOperation(op);
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);

if (!op->HasResultSentFlag()) {
DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
Expand All @@ -155,7 +148,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext

void TFinishProposeWriteUnit::CompleteRequest(TOperation::TPtr op, const TActorContext &ctx)
{
TWriteOperation* writeOp = CastWriteOperation(op);
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
auto res = writeOp->ReleaseWriteResult();

TDuration duration = TAppData::TimeProvider->Now() - op->GetReceivedAt();
Expand Down
18 changes: 3 additions & 15 deletions ydb/core/tx/datashard/write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class TWriteUnit : public TExecutionUnit {

const ui32 writeTableId = localTableId;
auto [readVersion, writeVersion] = self->GetReadWriteVersions(writeOp);
writeTx->SetReadVersion(readVersion);
writeTx->SetWriteVersion(writeVersion);

TDataShardUserDb userDb(*self, txc.DB, readVersion);
TDataShardChangeGroupProvider groupProvider(*self, txc.DB);
Expand All @@ -68,34 +70,20 @@ class TWriteUnit : public TExecutionUnit {
{
key.clear();
keyCells.clear();
keyCells.reserve(TableInfo_.KeyColumnIds.size());
ui64 keyBytes = 0;
for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) {
const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx];
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited", self->TabletID());
return;
}

keyBytes += cell.Size();
key.emplace_back(TRawTypeValue(cell.AsRef(), cellType));
keyCells.emplace_back(cell);
}

if (keyBytes > NLimits::MaxWriteKeySize) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize, self->TabletID());
return;
}

value.clear();
for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx);
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
if (cell.Size() > NLimits::MaxWriteValueSize) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize, self->TabletID());
return;
}

auto* col = TableInfo_.Columns.FindPtr(valueColIdx + 1);
Y_ABORT_UNLESS(col);

Expand Down