diff --git a/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp index 9415968e7e21..40bb415c4473 100644 --- a/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp @@ -103,8 +103,10 @@ class TBuildDistributedEraseTxOutRSUnit : public TExecutionUnit { condition->Prepare(txc.DB.GetRowScheme(tableInfo.LocalTid), 0); const auto tags = MakeTags(condition->Tags(), eraseTx->GetIndexColumnIds()); - auto readVersion = DataShard.GetReadWriteVersions(tx).ReadVersion; - TDataShardUserDb userDb(DataShard, txc.DB, readVersion); + auto now = TAppData::TimeProvider->Now(); + auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(tx); + NMiniKQL::TEngineHostCounters engineHostCounters; + TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), readVersion, writeVersion, engineHostCounters, now); bool pageFault = false; TDynBitMap confirmedRows; diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index deaed0d1e29c..086dcffd3a6e 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -199,7 +199,7 @@ class TDataShardEngineHost final , public IDataShardChangeGroupProvider { public: - TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now) + TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, ui64 globalTxId, TEngineHostCounters& counters, TInstant now) : TEngineHost(db, counters, TEngineHostSettings(self->TabletID(), (self->State == TShardState::Readonly || self->State == TShardState::Frozen || self->IsReplicated()), @@ -207,10 +207,7 @@ class TDataShardEngineHost final self->GetKeyAccessSampler())) , Self(self) , EngineBay(engineBay) - , DB(db) - , LockTxId(lockTxId) - , LockNodeId(lockNodeId) - , Now(now) + , UserDb(*self, db, globalTxId, TRowVersion::Min(), TRowVersion::Max(), counters, now) { } @@ -222,14 +219,7 @@ class TDataShardEngineHost final NTable::TSelectStats& stats, const TMaybe& readVersion) override { - auto tid = LocalTableId(tableId); - - return DB.Select( - tid, key, tags, row, stats, - /* readFlags */ 0, - readVersion.GetOrElse(ReadVersion), - GetReadTxMap(tableId), - GetReadTxObserver(tableId)); + return UserDb.SelectRow(tableId, key, tags, row, stats, readVersion); } NTable::EReady SelectRow( @@ -239,197 +229,84 @@ class TDataShardEngineHost final NTable::TRowState& row, const TMaybe& readVersion) override { - NTable::TSelectStats stats; - return SelectRow(tableId, key, tags, row, stats, readVersion); + return UserDb.SelectRow(tableId, key, tags, row, readVersion); } void SetWriteVersion(TRowVersion writeVersion) { - WriteVersion = writeVersion; + UserDb.SetWriteVersion(writeVersion); } TRowVersion GetWriteVersion(const TTableId& tableId) const override { Y_UNUSED(tableId); - Y_ABORT_UNLESS(!WriteVersion.IsMax(), "Cannot perform writes without WriteVersion set"); - return WriteVersion; + Y_ABORT_UNLESS(!UserDb.GetWriteVersion().IsMax(), "Cannot perform writes without WriteVersion set"); + return UserDb.GetWriteVersion(); } void SetReadVersion(TRowVersion readVersion) { - ReadVersion = readVersion; + UserDb.SetReadVersion(readVersion); } TRowVersion GetReadVersion(const TTableId& tableId) const override { Y_UNUSED(tableId); - Y_ABORT_UNLESS(!ReadVersion.IsMin(), "Cannot perform reads without ReadVersion set"); - return ReadVersion; + Y_ABORT_UNLESS(!UserDb.GetReadVersion().IsMin(), "Cannot perform reads without ReadVersion set"); + return UserDb.GetReadVersion(); } void SetVolatileTxId(ui64 txId) { - VolatileTxId = txId; + UserDb.SetVolatileTxId(txId); } void SetIsImmediateTx() { - IsImmediateTx = true; + UserDb.SetIsImmediateTx(true); } void SetIsRepeatableSnapshot() { - IsRepeatableSnapshot = true; + UserDb.SetIsRepeatableSnapshot(true); } std::optional GetCurrentChangeGroup() const override { - return ChangeGroup; + return UserDb.GetCurrentChangeGroup(); } ui64 GetChangeGroup() override { - if (!ChangeGroup) { - if (IsImmediateTx) { - NIceDb::TNiceDb db(DB); - ChangeGroup = Self->AllocateChangeRecordGroup(db); - } else { - // Distributed transactions have their group set to zero - ChangeGroup = 0; - } - } - - return *ChangeGroup; + return UserDb.GetChangeGroup(); } IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId) const override { - auto it = ChangeCollectors.find(tableId.PathId); - if (it != ChangeCollectors.end()) { - return it->second.Get(); - } - - it = ChangeCollectors.emplace(tableId.PathId, nullptr).first; - if (!Self->IsUserTable(tableId)) { - return it->second.Get(); - } - - it->second.Reset(CreateChangeCollector( - *Self, - *const_cast(this), - *const_cast(this), - DB, - tableId.PathId.LocalPathId)); - return it->second.Get(); + return UserDb.GetChangeCollector(tableId); } void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) { - auto localTid = Self->GetLocalTableId(tableId); - Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << Self->TabletID()); - - if (!DB.HasOpenTx(localTid, lockId)) { - return; - } - - if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min()); lock && !VolatileCommitOrdered) { - lock->ForAllVolatileDependencies([this](ui64 txId) { - auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId); - if (info && info->State != EVolatileTxState::Aborting) { - if (VolatileDependencies.insert(txId).second && !VolatileTxId) { - VolatileTxId = EngineBay.GetTxId(); - } - } - }); - } - - if (VolatileTxId) { - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, - "Scheduling commit of lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID()); - if (VolatileCommitTxIds.insert(lockId).second) { - // Update TxMap to include the new commit - auto it = TxMaps.find(tableId.PathId); - if (it != TxMaps.end()) { - it->second->Add(lockId, WriteVersion); - } - } - return; - } - - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, - "Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID()); - DB.CommitTx(localTid, lockId, writeVersion); - Self->GetConflictsCache().GetTableCache(localTid).RemoveUncommittedWrites(lockId, DB); - - if (!CommittedLockChanges.contains(lockId) && Self->HasLockChangeRecords(lockId)) { - if (auto* collector = GetChangeCollector(tableId)) { - collector->CommitLockChanges(lockId, WriteVersion); - CommittedLockChanges.insert(lockId); - } - } + UserDb.CommitChanges(tableId, lockId, writeVersion); } TVector GetCollectedChanges() const { - TVector total; - - for (auto& [_, collector] : ChangeCollectors) { - if (!collector) { - continue; - } - - auto collected = std::move(collector->GetCollected()); - std::move(collected.begin(), collected.end(), std::back_inserter(total)); - } - - return total; + return UserDb.GetCollectedChanges(); } void ResetCollectedChanges() { - for (auto& pr : ChangeCollectors) { - if (pr.second) { - pr.second->OnRestart(); - } - } + UserDb.ResetCollectedChanges(); } TVector GetVolatileCommitTxIds() const { - TVector commitTxIds; - - if (!VolatileCommitTxIds.empty()) { - commitTxIds.reserve(VolatileCommitTxIds.size()); - for (ui64 commitTxId : VolatileCommitTxIds) { - commitTxIds.push_back(commitTxId); - } - } - - return commitTxIds; + return UserDb.GetVolatileCommitTxIds(); } const absl::flat_hash_set& GetVolatileDependencies() const { - return VolatileDependencies; + return UserDb.GetVolatileDependencies(); } std::optional GetVolatileChangeGroup() const { - return ChangeGroup; + return UserDb.GetChangeGroup(); } bool GetVolatileCommitOrdered() const { - return VolatileCommitOrdered; + return UserDb.GetVolatileCommitOrdered(); } bool IsValidKey(TKeyDesc& key) const override { - if (TSysTables::IsSystemTable(key.TableId)) - return DataShardSysTable(key.TableId).IsValidKey(key); - - ui64 localTableId = Self->GetLocalTableId(key.TableId); - Y_ABORT_UNLESS(localTableId != 0, "Unexpected IsValidKey for an unknown table"); - - if (LockTxId) { - // Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs - if (key.RowOperation != TKeyDesc::ERowOperation::Read && - (!Self->GetEnableLockedWrites() || !IsImmediateTx || !IsRepeatableSnapshot || !LockNodeId)) - { - key.Status = TKeyDesc::EStatus::OperationNotSupported; - return false; - } - } else if (IsRepeatableSnapshot) { - // Prevent updates/erases in repeatable mvcc txs - if (key.RowOperation != TKeyDesc::ERowOperation::Read) { - key.Status = TKeyDesc::EStatus::OperationNotSupported; - return false; - } - } - - return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key); + TKeyValidator::TValidateOptions options(UserDb); + return GetKeyValidator().IsValidKey(key, options); } NUdf::TUnboxedValue SelectRow(const TTableId& tableId, const TArrayRef& row, @@ -440,11 +317,11 @@ class TDataShardEngineHost final return DataShardSysTable(tableId).SelectRow(row, columnIds, returnType, readTarget, holderFactory); } - if (LockTxId) { + if (UserDb.GetLockTxId()) { Self->SysLocksTable().SetLock(tableId, row); } - Self->SetTableAccessTime(tableId, Now); + Self->SetTableAccessTime(tableId, UserDb.GetNow()); return TEngineHost::SelectRow(tableId, row, columnIds, returnType, readTarget, holderFactory); } @@ -455,11 +332,11 @@ class TDataShardEngineHost final { Y_ABORT_UNLESS(!TSysTables::IsSystemTable(tableId), "SelectRange no system table is not supported"); - if (LockTxId) { + if (UserDb.GetLockTxId()) { Self->SysLocksTable().SetLock(tableId, range); } - Self->SetTableAccessTime(tableId, Now); + Self->SetTableAccessTime(tableId, UserDb.GetNow()); return TEngineHost::SelectRange(tableId, range, columnIds, skipNullKeys, returnType, readTarget, itemsLimit, bytesLimit, reverse, forbidNullArgs, holderFactory); } @@ -470,64 +347,19 @@ class TDataShardEngineHost final return; } - CheckWriteConflicts(tableId, row); + const NTable::TScheme::TTableInfo* tableInfo = Scheme.GetTableInfo(LocalTableId(tableId)); - if (LockTxId) { - Self->SysLocksTable().SetWriteLock(tableId, row); - } else { - Self->SysLocksTable().BreakLocks(tableId, row); - } - Self->SetTableUpdateTime(tableId, Now); - - // apply special columns if declared - TUserTable::TSpecialUpdate specUpdates = Self->SpecialUpdates(DB, tableId); - if (specUpdates.HasUpdates) { - TStackVec extendedCmds; - extendedCmds.reserve(commands.size() + 3); - for (const TUpdateCommand& cmd : commands) { - if (cmd.Column == specUpdates.ColIdTablet) - specUpdates.ColIdTablet = Max(); - else if (cmd.Column == specUpdates.ColIdEpoch) - specUpdates.ColIdEpoch = Max(); - else if (cmd.Column == specUpdates.ColIdUpdateNo) - specUpdates.ColIdUpdateNo = Max(); - - extendedCmds.push_back(cmd); - } - - if (specUpdates.ColIdTablet != Max()) { - extendedCmds.emplace_back(TUpdateCommand{ - specUpdates.ColIdTablet, TKeyDesc::EColumnOperation::Set, EInplaceUpdateMode::Unknown, - TCell::Make(specUpdates.Tablet) - }); - } - - if (specUpdates.ColIdEpoch != Max()) { - extendedCmds.emplace_back(TUpdateCommand{ - specUpdates.ColIdEpoch, TKeyDesc::EColumnOperation::Set, EInplaceUpdateMode::Unknown, - TCell::Make(specUpdates.Epoch) - }); - } + TSmallVec key; + ConvertTableKeys(Scheme, tableInfo, row, key, nullptr); - if (specUpdates.ColIdUpdateNo != Max()) { - extendedCmds.emplace_back(TUpdateCommand{ - specUpdates.ColIdUpdateNo, TKeyDesc::EColumnOperation::Set, EInplaceUpdateMode::Unknown, - TCell::Make(specUpdates.UpdateNo) - }); - } + TSmallVec ops; + ConvertTableValues(Scheme, tableInfo, commands, ops, nullptr); - TEngineHost::UpdateRow(tableId, row, {extendedCmds.data(), extendedCmds.size()}); - } else { - TEngineHost::UpdateRow(tableId, row, commands); - } + UserDb.UpdateRow(tableId, key, ops); + } - if (VolatileTxId) { - Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, VolatileTxId, Db); - } else if (LockTxId) { - Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, LockTxId, Db); - } else { - Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).RemoveUncommittedWrites(row, Db); - } + void UpdateRow(const TTableId& tableId, const TArrayRef key, const TArrayRef ops) override { + UserDb.UpdateRow(tableId, key, ops); } void EraseRow(const TTableId& tableId, const TArrayRef& row) override { @@ -536,26 +368,19 @@ class TDataShardEngineHost final return; } - CheckWriteConflicts(tableId, row); - - if (LockTxId) { - Self->SysLocksTable().SetWriteLock(tableId, row); - } else { - Self->SysLocksTable().BreakLocks(tableId, row); - } + const NTable::TScheme::TTableInfo* tableInfo = Scheme.GetTableInfo(LocalTableId(tableId)); - Self->SetTableUpdateTime(tableId, Now); - TEngineHost::EraseRow(tableId, row); + TSmallVec key; + ConvertTableKeys(Scheme, tableInfo, row, key, nullptr); - if (VolatileTxId) { - Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, VolatileTxId, Db); - } else if (LockTxId) { - Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, LockTxId, Db); - } else { - Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).RemoveUncommittedWrites(row, Db); - } + UserDb.EraseRow(tableId, key); } + void EraseRow(const TTableId& tableId, const TArrayRef key) override + { + UserDb.EraseRow(tableId, key); + } + // Returns whether row belong this shard. bool IsMyKey(const TTableId& tableId, const TArrayRef& row) const override { if (TSysTables::IsSystemTable(tableId)) @@ -578,7 +403,7 @@ class TDataShardEngineHost final return TDataShardEngineHost::LocalTableId(tableId) == 0; } - ui64 LocalTableId(const TTableId &tableId) const override { + ui64 LocalTableId(const TTableId& tableId) const override { return Self->GetLocalTableId(tableId); } @@ -592,19 +417,7 @@ class TDataShardEngineHost final if (TSysTables::IsSystemTable(tableId)) return 0; - if (VolatileTxId) { - Y_ABORT_UNLESS(!LockTxId); - if (VolatileCommitTxIds.insert(VolatileTxId).second) { - // Update TxMap to include the new commit - auto it = TxMaps.find(tableId.PathId); - if (it != TxMaps.end()) { - it->second->Add(VolatileTxId, WriteVersion); - } - } - return VolatileTxId; - } - - return LockTxId; + return UserDb.GetWriteTxId(tableId); } NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId) const override { @@ -612,380 +425,23 @@ class TDataShardEngineHost final return nullptr; } - auto baseTxMap = Self->GetVolatileTxManager().GetTxMap(); - - bool needTxMap = ( - // We need tx map when there are waiting volatile transactions - baseTxMap || - // We need tx map to see committed volatile tx changes - VolatileTxId && !VolatileCommitTxIds.empty() || - // We need tx map when current lock has uncommitted changes - LockTxId && Self->SysLocksTable().HasCurrentWriteLock(tableId)); - - if (!needTxMap) { - // We don't need tx map - return nullptr; - } - - auto& ptr = TxMaps[tableId.PathId]; - if (!ptr) { - ptr = new NTable::TDynamicTransactionMap(baseTxMap); - if (LockTxId) { - // Uncommitted changes are visible in all possible snapshots - ptr->Add(LockTxId, TRowVersion::Min()); - } else if (VolatileTxId) { - // We want committed volatile changes to be visible at the write version - for (ui64 commitTxId : VolatileCommitTxIds) { - ptr->Add(commitTxId, WriteVersion); - } - } - } - - return ptr; + return UserDb.GetReadTxMap(tableId); } NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const override { if (TSysTables::IsSystemTable(tableId)) return nullptr; - bool needObserver = ( - // We need observer when there are waiting changes in the tx map - Self->GetVolatileTxManager().GetTxMap() || - // We need observer for locked reads when there are active write locks - LockTxId && Self->SysLocksTable().HasWriteLocks(tableId)); - - if (!needObserver) { - // We don't need tx observer - return nullptr; - } - - auto& ptr = TxObservers[tableId.PathId]; - if (!ptr) { - if (LockTxId) { - ptr = new TLockedReadTxObserver(this); - } else { - ptr = new TReadTxObserver(this); - } - } - - return ptr; - } - - class TLockedReadTxObserver : public NTable::ITransactionObserver { - public: - TLockedReadTxObserver(const TDataShardEngineHost* host) - : Host(host) - { } - - void OnSkipUncommitted(ui64 txId) override { - Host->AddReadConflict(txId); - } - - void OnSkipCommitted(const TRowVersion&) override { - // We already use InvisibleRowSkips for these - } - - void OnSkipCommitted(const TRowVersion&, ui64) override { - // We already use InvisibleRowSkips for these - } - - void OnApplyCommitted(const TRowVersion& rowVersion) override { - Host->CheckReadConflict(rowVersion); - } - - void OnApplyCommitted(const TRowVersion& rowVersion, ui64 txId) override { - Host->CheckReadConflict(rowVersion); - Host->CheckReadDependency(txId); - } - - private: - const TDataShardEngineHost* const Host; - }; - - class TReadTxObserver : public NTable::ITransactionObserver { - public: - TReadTxObserver(const TDataShardEngineHost* host) - : Host(host) - { } - - void OnSkipUncommitted(ui64) override { - // We don't care about uncommitted changes - // Any future commit is supposed to be above our read version - } - - void OnSkipCommitted(const TRowVersion&) override { - // We already use InvisibleRowSkips for these - } - - void OnSkipCommitted(const TRowVersion&, ui64) override { - // We already use InvisibleRowSkips for these - } - - void OnApplyCommitted(const TRowVersion&) override { - // Not needed - } - - void OnApplyCommitted(const TRowVersion&, ui64 txId) override { - Host->CheckReadDependency(txId); - } - - private: - const TDataShardEngineHost* const Host; - }; - - void AddReadConflict(ui64 txId) const { - Y_ABORT_UNLESS(LockTxId); - - // We have detected uncommitted changes in txId that could affect - // our read result. We arrange a conflict that breaks our lock - // when txId commits. - Self->SysLocksTable().AddReadConflict(txId); - } - - void CheckReadConflict(const TRowVersion& rowVersion) const { - Y_ABORT_UNLESS(LockTxId); - - if (rowVersion > ReadVersion) { - // We are reading from snapshot at ReadVersion and should not normally - // observe changes with a version above that. However, if we have an - // uncommitted change, that we fake as committed for our own changes - // visibility, we might shadow some change that happened after a - // snapshot. This is a clear indication of a conflict between read - // and that future conflict, hence we must break locks and abort. - Self->SysLocksTable().BreakSetLocks(); - EngineBay.GetKqpComputeCtx().SetInconsistentReads(); - } - } - - void CheckReadDependency(ui64 txId) const { - if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { - switch (info->State) { - case EVolatileTxState::Waiting: - // We are reading undecided changes and need to wait until they are resolved - EngineBay.GetKqpComputeCtx().AddVolatileReadDependency(info->TxId); - break; - case EVolatileTxState::Committed: - // Committed changes are immediately visible and don't need a dependency - break; - case EVolatileTxState::Aborting: - // We just read something that we know is aborting, we would have to retry later - EngineBay.GetKqpComputeCtx().AddVolatileReadDependency(info->TxId); - break; - } - } + return UserDb.GetReadTxObserver(tableId); } bool NeedToReadBeforeWrite(const TTableId& tableId) const override { - if (Self->GetVolatileTxManager().GetTxMap()) { - return true; - } - - if (Self->SysLocksTable().HasWriteLocks(tableId)) { - return true; - } - - if (auto* collector = GetChangeCollector(tableId)) { - if (collector->NeedToReadKeys()) { - return true; - } - } - - return false; - } - - void CheckWriteConflicts(const TTableId& tableId, TArrayRef keyCells) { - // When there are uncommitted changes (write locks) we must find which - // locks would break upon commit. - bool mustFindConflicts = Self->SysLocksTable().HasWriteLocks(tableId); - - // When there are volatile changes (tx map) we try to find precise - // dependencies, but we may switch to total order on page faults. - const bool tryFindConflicts = mustFindConflicts || - (!VolatileCommitOrdered && Self->GetVolatileTxManager().GetTxMap()); - - if (!tryFindConflicts) { - // We don't need to find conflicts - return; - } - - const auto localTid = LocalTableId(tableId); - Y_ABORT_UNLESS(localTid); - - ui64 skipCount = 0; - - NTable::ITransactionObserverPtr txObserver; - if (LockTxId) { - // We cannot use cached conflicts since we need to find skip count - txObserver = new TLockedWriteTxObserver(this, LockTxId, skipCount, localTid); - // Locked writes are immediate, increased latency is not critical - mustFindConflicts = true; - } else if (auto* cached = Self->GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) { - for (ui64 txId : *cached) { - BreakWriteConflict(txId); - } - return; - } else { - txObserver = new TWriteTxObserver(this); - // Prefer precise conflicts for non-distributed transactions - if (IsImmediateTx) { - mustFindConflicts = true; - } - } - - // We are not actually interested in the row version, we only need to - // detect uncommitted transaction skips on the path to that version. - auto res = Db.SelectRowVersion( - localTid, keyCells, /* readFlags */ 0, - nullptr, txObserver); - - if (res.Ready == NTable::EReady::Page) { - if (mustFindConflicts || LockTxId) { - // We must gather all conflicts - throw TNotReadyTabletException(); - } - - // Upgrade to volatile ordered commit and ignore the page fault - if (!VolatileCommitOrdered) { - if (!VolatileTxId) { - VolatileTxId = EngineBay.GetTxId(); - } - VolatileCommitOrdered = true; - VolatileDependencies.clear(); - } - return; - } - - if (LockTxId || VolatileTxId) { - ui64 skipLimit = Self->GetMaxLockedWritesPerKey(); - if (skipLimit > 0 && skipCount >= skipLimit) { - throw TLockedWriteLimitException(); - } - } - } - - class TLockedWriteTxObserver : public NTable::ITransactionObserver { - public: - TLockedWriteTxObserver(TDataShardEngineHost* host, ui64 txId, ui64& skipCount, ui32 localTid) - : Host(host) - , SelfTxId(txId) - , SkipCount(skipCount) - , LocalTid(localTid) - { - } - - void OnSkipUncommitted(ui64 txId) override { - // Note: all active volatile transactions will be uncommitted - // without a tx map, and will be handled by AddWriteConflict. - if (!Host->Db.HasRemovedTx(LocalTid, txId)) { - ++SkipCount; - if (!SelfFound) { - if (txId != SelfTxId) { - Host->AddWriteConflict(txId); - } else { - SelfFound = true; - } - } - } - } - - void OnSkipCommitted(const TRowVersion&) override { - // nothing - } - - void OnSkipCommitted(const TRowVersion&, ui64) override { - // nothing - } - - void OnApplyCommitted(const TRowVersion&) override { - // nothing - } - - void OnApplyCommitted(const TRowVersion&, ui64) override { - // nothing - } - - private: - TDataShardEngineHost* const Host; - const ui64 SelfTxId; - ui64& SkipCount; - const ui32 LocalTid; - bool SelfFound = false; - }; - - class TWriteTxObserver : public NTable::ITransactionObserver { - public: - TWriteTxObserver(TDataShardEngineHost* host) - : Host(host) - { - } - - void OnSkipUncommitted(ui64 txId) override { - // Note: all active volatile transactions will be uncommitted - // without a tx map, and will be handled by BreakWriteConflict. - Host->BreakWriteConflict(txId); - } - - void OnSkipCommitted(const TRowVersion&) override { - // nothing - } - - void OnSkipCommitted(const TRowVersion&, ui64) override { - // nothing - } - - void OnApplyCommitted(const TRowVersion&) override { - // nothing - } - - void OnApplyCommitted(const TRowVersion&, ui64) override { - // nothing - } - - private: - TDataShardEngineHost* const Host; - }; - - void AddWriteConflict(ui64 txId) const { - if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { - if (info->State != EVolatileTxState::Aborting) { - Self->SysLocksTable().AddVolatileDependency(info->TxId); - } - } else { - Self->SysLocksTable().AddWriteConflict(txId); - } - } - - void BreakWriteConflict(ui64 txId) { - if (VolatileCommitTxIds.contains(txId)) { - // Skip our own commits - } else if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { - // We must not overwrite uncommitted changes that may become committed - // later, so we need to add a dependency that will force us to wait - // until it is persistently committed. We may ignore aborting changes - // even though they may not be persistent yet, since this tx will - // also perform writes, and either it fails, or future generation - // could not have possibly committed it already. - if (info->State != EVolatileTxState::Aborting && !VolatileCommitOrdered) { - if (!VolatileTxId) { - // All further writes will use this VolatileTxId and will - // add it to VolatileCommitTxIds, forcing it to be committed - // like a volatile transaction. Note that this does not make - // it into a real volatile transaction, it works as usual in - // every sense, only persistent commit order is affected by - // a dependency below. - VolatileTxId = EngineBay.GetTxId(); - } - VolatileDependencies.insert(info->TxId); - } - } else { - // Break uncommitted locks - Self->SysLocksTable().BreakLock(txId); - } + return UserDb.NeedToReadBeforeWrite(tableId); } private: const TDataShardSysTable& DataShardSysTable(const TTableId& tableId) const { - return static_cast(Self->GetDataShardSysTables())->Get(tableId); + return static_cast(Self->GetDataShardSysTables())->Get(tableId); } TKeyValidator& GetKeyValidator() { @@ -995,51 +451,41 @@ class TDataShardEngineHost final return EngineBay.GetKeyValidator(); } +public: + TDataShardUserDb& GetUserDb() { + return UserDb; + } + const TDataShardUserDb& GetUserDb() const { + return UserDb; + } + +private: TDataShard* Self; TEngineBay& EngineBay; - NTable::TDatabase& DB; - const ui64& LockTxId; - const ui32& LockNodeId; - bool IsImmediateTx = false; - bool IsRepeatableSnapshot = false; - TInstant Now; - TRowVersion WriteVersion = TRowVersion::Max(); - TRowVersion ReadVersion = TRowVersion::Min(); - ui64 VolatileTxId = 0; - absl::flat_hash_set CommittedLockChanges; - mutable absl::flat_hash_map> ChangeCollectors; - mutable absl::flat_hash_map> TxMaps; - mutable absl::flat_hash_map TxObservers; - mutable absl::flat_hash_set VolatileCommitTxIds; - mutable absl::flat_hash_set VolatileDependencies; - std::optional ChangeGroup = std::nullopt; - bool VolatileCommitOrdered = false; + mutable TDataShardUserDb UserDb; }; // -TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx, - std::pair stepTxId) +TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId) : StepTxId(stepTxId) , KeyValidator(*self, txc.DB) - , LockTxId(0) - , LockNodeId(0) { auto now = TAppData::TimeProvider->Now(); - EngineHost = MakeHolder(self, *this, txc.DB, EngineHostCounters, LockTxId, LockNodeId, now); + EngineHost = MakeHolder(self, *this, txc.DB, stepTxId.TxId, EngineHostCounters, now); EngineSettings = MakeHolder(IEngineFlat::EProtocol::V1, AppData(ctx)->FunctionRegistry, *TAppData::RandomProvider, *TAppData::TimeProvider, EngineHost.Get(), self->AllocCounters); auto tabletId = self->TabletID(); - auto txId = stepTxId.second; + auto txId = stepTxId.TxId; const TActorSystem* actorSystem = ctx.ExecutorThread.ActorSystem; EngineSettings->LogErrorWriter = [actorSystem, tabletId, txId](const TString& message) { LOG_ERROR_S(*actorSystem, NKikimrServices::MINIKQL_ENGINE, "Shard %" << tabletId << ", txid %" <Satisfies(NLog::PRI_DEBUG, NKikimrServices::MINIKQL_ENGINE, stepTxId.second)) { + if (ctx.LoggerSettings()->Satisfies(NLog::PRI_DEBUG, NKikimrServices::MINIKQL_ENGINE, txId)) { EngineSettings->BacktraceWriter = [actorSystem, tabletId, txId](const char * operation, ui32 line, const TBackTrace* backtrace) { @@ -1055,7 +501,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor "Shard %" << tabletId << ", txid %" << txId << ": " << message); }; - ComputeCtx = MakeHolder(self, *EngineHost, now); + ComputeCtx = MakeHolder(self, GetUserDb(), EngineHost->GetSettings().DisableByKeyFilter); ComputeCtx->Database = &txc.DB; KqpAlloc = MakeHolder(__LOCATION__, TAlignedPagePoolCounters(), AppData(ctx)->FunctionRegistry->SupportsSizedAllocators()); @@ -1064,7 +510,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor auto kqpApplyCtx = MakeHolder(); kqpApplyCtx->Host = EngineHost.Get(); - kqpApplyCtx->ShardTableStats = &ComputeCtx->GetDatashardCounters(); + kqpApplyCtx->ShardTableStats = &EngineHostCounters; kqpApplyCtx->Env = KqpTypeEnv.Get(); KqpApplyCtx.Reset(kqpApplyCtx.Release()); @@ -1212,15 +658,32 @@ bool TEngineBay::GetVolatileCommitOrdered() const { IEngineFlat * TEngineBay::GetEngine() { if (!Engine) { Engine = CreateEngineFlat(*EngineSettings); - Engine->SetStepTxId(StepTxId); + Engine->SetStepTxId(StepTxId.ToPair()); } return Engine.Get(); } +TDataShardUserDb& TEngineBay::GetUserDb() { + Y_ABORT_UNLESS(EngineHost); + + auto* host = static_cast(EngineHost.Get()); + return host->GetUserDb(); +} +const TDataShardUserDb& TEngineBay::GetUserDb() const { + Y_ABORT_UNLESS(EngineHost); + + const auto* host = static_cast(EngineHost.Get()); + return host->GetUserDb(); +} + void TEngineBay::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) { - LockTxId = lockTxId; - LockNodeId = lockNodeId; + Y_ABORT_UNLESS(EngineHost); + + auto* host = static_cast(EngineHost.Get()); + host->GetUserDb().SetLockTxId(lockTxId); + host->GetUserDb().SetLockNodeId(lockNodeId); + if (ComputeCtx) { ComputeCtx->SetLockTxId(lockTxId, lockNodeId); } diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 6ec942aaeb6f..3c0808e0f470 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -3,6 +3,7 @@ #include "defs.h" #include "change_collector.h" #include "key_validator.h" +#include "operation.h" #include #include @@ -21,6 +22,7 @@ using NTabletFlatExecutor::TTransactionContext; namespace NDataShard { class TDataShard; +class TDataShardUserDb; TIntrusivePtr InitDataShardSysTables(TDataShard* self); @@ -41,14 +43,17 @@ class TEngineBay : TNonCopyable { ui64 TotalKeysSize = 0; }; - TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx, - std::pair stepTxId); + TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId); virtual ~TEngineBay(); const NMiniKQL::IEngineFlat * GetEngine() const { return Engine.Get(); } NMiniKQL::IEngineFlat * GetEngine(); NMiniKQL::TEngineHost * GetEngineHost() { return EngineHost.Get(); } + + TDataShardUserDb& GetUserDb(); + const TDataShardUserDb& GetUserDb() const; + void SetLockTxId(ui64 lockTxId, ui32 lockNodeId); void SetUseLlvmRuntime(bool llvmRuntime) { EngineSettings->LlvmRuntime = llvmRuntime; } @@ -86,11 +91,9 @@ class TEngineBay : TNonCopyable { EngineHost.Reset(); } - ui64 GetStep() const { return StepTxId.first; } - ui64 GetTxId() const { return StepTxId.second; } - TKeyValidator& GetKeyValidator() { return KeyValidator; } const TKeyValidator& GetKeyValidator() const { return KeyValidator; } + TValidationInfo& TxInfo() { return KeyValidator.GetInfo(); } const TValidationInfo& TxInfo() const { return KeyValidator.GetInfo(); } TEngineBay::TSizes CalcSizes(bool needsTotalKeysSize) const; @@ -117,14 +120,12 @@ class TEngineBay : TNonCopyable { NMiniKQL::TKqpDatashardComputeContext& GetKqpComputeCtx(); private: - std::pair StepTxId; + TStepOrder StepTxId; THolder EngineHost; THolder EngineSettings; THolder Engine; TKeyValidator KeyValidator; TEngineHostCounters EngineHostCounters; - ui64 LockTxId; - ui32 LockNodeId; NYql::NDq::TLogFunc KqpLogFunc; THolder KqpApplyCtx; THolder ComputeCtx; diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index dff9387950bf..b54f21d39a82 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -21,7 +21,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, bool usesMvccSnapshot) : StepTxId_(stepTxId) , TxBody(txBody) - , EngineBay(self, txc, ctx, stepTxId.ToPair()) + , EngineBay(self, txc, ctx, stepTxId) , ErrCode(NKikimrTxDataShard::TError::OK) , TxSize(0) , TxCacheUsage(0) @@ -41,6 +41,8 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, return; } + auto& typeRegistry = *AppData()->TypeRegistry; + ComputeTxSize(); NActors::NMemory::TLabel::Add(TxSize); @@ -79,7 +81,6 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, return; } - auto& typeRegistry = *AppData()->TypeRegistry; auto& computeCtx = EngineBay.GetKqpComputeCtx(); try { @@ -140,7 +141,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, } } - KqpSetTxKeys(tabletId, task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay); + KqpSetTxKeys(tabletId, task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay.GetKeyValidator()); for (auto& output : task.GetOutputs()) { for (auto& channel : output.GetChannels()) { @@ -156,7 +157,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, IsReadOnly = IsReadOnly && Tx.GetReadOnly(); - KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay); + KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay.GetKeyValidator()); EngineBay.MarkTxLoaded(); auto& tasksRunner = GetKqpTasksRunner(); // create tasks runner, can throw TMemoryLimitExceededException @@ -227,7 +228,8 @@ bool TValidatedDataTx::ReValidateKeys() using EResult = NMiniKQL::IEngineFlat::EResult; if (IsKqpTx()) { - auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo()); + TKeyValidator::TValidateOptions options(EngineBay.GetUserDb()); + auto [result, error] = EngineBay.GetKeyValidator().ValidateKeys(options); if (result != EResult::Ok) { ErrStr = std::move(error); ErrCode = ConvertErrCode(result); diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 46c8977a6335..ff31a995f507 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -74,7 +74,8 @@ bool TCommonUploadOps::Execute(TDataShard* self, TTrans self->SysLocksTable().HasWriteLocks(fullTableId) || self->GetVolatileTxManager().GetTxMap()); - TDataShardUserDb userDb(*self, txc.DB, readVersion); + NMiniKQL::TEngineHostCounters engineHostCounters; + TDataShardUserDb userDb(*self, txc.DB, globalTxId, readVersion, writeVersion, engineHostCounters, TAppData::TimeProvider->Now()); TDataShardChangeGroupProvider groupProvider(*self, txc.DB); if (CollectChanges) { @@ -253,7 +254,7 @@ bool TCommonUploadOps::Execute(TDataShard* self, TTrans self->GetConflictsCache().GetTableCache(writeTableId).AddUncommittedWrite(keyCells.GetCells(), globalTxId, txc.DB); if (!commitAdded) { // Make sure we see our own changes on further iterations - userDb.AddCommitTxId(globalTxId, writeVersion); + userDb.AddCommitTxId(fullTableId, globalTxId, writeVersion); commitAdded = true; } } else { diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index cc7835b8b3a5..0a4d6cc91d69 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -59,6 +59,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } } + std::optional engineHostCounters; std::optional userDb; std::optional groupProvider; @@ -69,7 +70,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( condition->Prepare(params.Txc->DB.GetRowScheme(localTableId), 0); } - userDb.emplace(*self, params.Txc->DB, params.ReadVersion); + engineHostCounters.emplace(); + userDb.emplace(*self, params.Txc->DB, params.GlobalTxId, params.ReadVersion, params.WriteVersion, *engineHostCounters, TAppData::TimeProvider->Now()); groupProvider.emplace(*self, params.Txc->DB); params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, *groupProvider, params.Txc->DB, tableInfo)); } @@ -178,7 +180,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( self->GetConflictsCache().GetTableCache(localTableId).AddUncommittedWrite(keyCells.GetCells(), params.GlobalTxId, params.Txc->DB); if (!commitAdded && userDb) { // Make sure we see our own changes on further iterations - userDb->AddCommitTxId(params.GlobalTxId, params.WriteVersion); + userDb->AddCommitTxId(fullTableId, params.GlobalTxId, params.WriteVersion); commitAdded = true; } } else { diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index b80353756869..b18bc8705cad 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -362,7 +362,7 @@ template void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const TUserTable* tableInfo, const NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange& rangeKind, const TReadOpMeta* readMeta, const TWriteOpMeta* writeMeta, const NScheme::TTypeRegistry& typeRegistry, - const TActorContext& ctx, TEngineBay& engineBay) + const TActorContext& ctx, TKeyValidator& keyValidator) { if (Read) { Y_ABORT_UNLESS(readMeta); @@ -387,10 +387,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const Y_DEBUG_ABORT_UNLESS(!(tableRange.To.GetCells().empty() && tableRange.ToInclusive)); if constexpr (Read) { - engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), + keyValidator.AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.GetKeyValidator().AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, + keyValidator.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } } @@ -405,9 +405,9 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const << DebugPrintPoint(tableInfo->KeyColumnTypes, tablePoint.From.GetCells(), typeRegistry)); if constexpr (Read) { - engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); + keyValidator.AddReadRange(tableId, GetColumns(*readMeta), tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.GetKeyValidator().AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); + keyValidator.AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } } @@ -424,10 +424,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const << DebugPrintRange(tableInfo->KeyColumnTypes, tableRange.ToTableRange(), typeRegistry)); if constexpr (Read) { - engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), + keyValidator.AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.GetKeyValidator().AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, + keyValidator.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } @@ -440,10 +440,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const << ", task: " << taskId << ", " << (Read ? "read range: UNSPECIFIED" : "write range: UNSPECIFIED")); if constexpr (Read) { - engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableInfo->Range.ToTableRange(), + keyValidator.AddReadRange(tableId, GetColumns(*readMeta), tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.GetKeyValidator().AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes, + keyValidator.AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } @@ -456,7 +456,7 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, const NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta& meta, const NScheme::TTypeRegistry& typeRegistry, - const TActorContext& ctx, TEngineBay& engineBay) + const TActorContext& ctx, TKeyValidator& keyValidator) { auto& tableMeta = meta.GetTable(); auto tableId = TTableId(tableMeta.GetTableId().GetOwnerId(), tableMeta.GetTableId().GetTableId(), @@ -464,16 +464,16 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, for (auto& read : meta.GetReads()) { KqpSetTxKeysImpl(tabletId, taskId, tableId, tableInfo, read.GetRange(), &read, nullptr, - typeRegistry, ctx, engineBay); + typeRegistry, ctx, keyValidator); } if (meta.HasWrites()) { KqpSetTxKeysImpl(tabletId, taskId, tableId, tableInfo, meta.GetWrites().GetRange(), nullptr, - &meta.GetWrites(), typeRegistry, ctx, engineBay); + &meta.GetWrites(), typeRegistry, ctx, keyValidator); } } -void KqpSetTxLocksKeys(const NKikimrDataEvents::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay) { +void KqpSetTxLocksKeys(const NKikimrDataEvents::TKqpLocks& locks, const TSysLocks& sysLocks, TKeyValidator& keyValidator) { if (locks.LocksSize() == 0) { return; } @@ -491,10 +491,10 @@ void KqpSetTxLocksKeys(const NKikimrDataEvents::TKqpLocks& locks, const TSysLock if (sysLocks.IsMyKey(lockKey)) { auto point = TTableRange(lockKey, true, {}, true, /* point */ true); if (NeedValidateLocks(locks.GetOp())) { - engineBay.GetKeyValidator().AddReadRange(sysLocksTableId, {}, point, lockRowType); + keyValidator.AddReadRange(sysLocksTableId, {}, point, lockRowType); } if (NeedEraseLocks(locks.GetOp())) { - engineBay.GetKeyValidator().AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true); + keyValidator.AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true); } } } diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 2109af8c7336..54b9c4746958 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -16,9 +16,9 @@ bool KqpValidateTransaction(const ::google::protobuf::RepeatedPtrField<::NYql::N void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, const NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta& meta, const NScheme::TTypeRegistry& typeRegistry, - const TActorContext& ctx, TEngineBay& engineBay); + const TActorContext& ctx, TKeyValidator& keyValidator); -void KqpSetTxLocksKeys(const NKikimrDataEvents::TKqpLocks& locks, const TSysLocks& sysLocks, TEngineBay& engineBay); +void KqpSetTxLocksKeys(const NKikimrDataEvents::TKqpLocks& locks, const TSysLocks& sysLocks, TKeyValidator& keyValidator); NYql::NDq::ERunStatus KqpRunTransaction(const TActorContext& ctx, ui64 txId, const NKikimrDataEvents::TKqpLocks& kqpLocks, bool useGenericReadSets, NKqp::TKqpTasksRunner& tasksRunner); diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index c7da246b0233..76bde7dcc2e4 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -1,5 +1,6 @@ #include "datashard_kqp_compute.h" #include "range_ops.h" +#include "datashard_user_db.h" #include #include @@ -86,6 +87,13 @@ TComputationNodeFactory GetKqpScanComputeFactory(TKqpScanComputeContext* compute }; } +TKqpDatashardComputeContext::TKqpDatashardComputeContext(NDataShard::TDataShard* shard, NDataShard::TDataShardUserDb& userDb, bool disableByKeyFilter) + : Shard(shard) + , UserDb(userDb) + , DisableByKeyFilter(disableByKeyFilter) +{ +} + ui64 TKqpDatashardComputeContext::GetLocalTableId(const TTableId &tableId) const { MKQL_ENSURE_S(Shard); return Shard->GetLocalTableId(tableId); @@ -138,38 +146,42 @@ const NDataShard::TUserTable* TKqpDatashardComputeContext::GetTable(const TTable } void TKqpDatashardComputeContext::TouchTableRange(const TTableId& tableId, const TTableRange& range) const { - if (LockTxId) { + if (UserDb.GetLockTxId()) { Shard->SysLocksTable().SetLock(tableId, range); } - Shard->SetTableAccessTime(tableId, Now); + Shard->SetTableAccessTime(tableId, UserDb.GetNow()); } void TKqpDatashardComputeContext::TouchTablePoint(const TTableId& tableId, const TArrayRef& key) const { - if (LockTxId) { + if (UserDb.GetLockTxId()) { Shard->SysLocksTable().SetLock(tableId, key); } - Shard->SetTableAccessTime(tableId, Now); + Shard->SetTableAccessTime(tableId, UserDb.GetNow()); } void TKqpDatashardComputeContext::BreakSetLocks() const { - if (LockTxId) { + if (UserDb.GetLockTxId()) { Shard->SysLocksTable().BreakSetLocks(); } } void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) { - LockTxId = lockTxId; - LockNodeId = lockNodeId; + UserDb.SetLockTxId(lockTxId); + UserDb.SetLockNodeId(lockNodeId); } void TKqpDatashardComputeContext::SetReadVersion(TRowVersion readVersion) { - ReadVersion = readVersion; + UserDb.SetReadVersion(readVersion); } TRowVersion TKqpDatashardComputeContext::GetReadVersion() const { - Y_ABORT_UNLESS(!ReadVersion.IsMin(), "Cannot perform reads without ReadVersion set"); + Y_ABORT_UNLESS(!UserDb.GetReadVersion().IsMin(), "Cannot perform reads without ReadVersion set"); + + return UserDb.GetReadVersion(); +} - return ReadVersion; +TEngineHostCounters& TKqpDatashardComputeContext::GetDatashardCounters() { + return UserDb.GetCounters(); } void TKqpDatashardComputeContext::SetTaskOutputChannel(ui64 taskId, ui64 channelId, TActorId actorId) { @@ -186,8 +198,7 @@ TActorId TKqpDatashardComputeContext::GetTaskOutputChannel(ui64 taskId, ui64 cha void TKqpDatashardComputeContext::Clear() { Database = nullptr; - LockTxId = 0; - LockNodeId = 0; + SetLockTxId(0, 0); } bool TKqpDatashardComputeContext::PinPages(const TVector& keys, ui64 pageFaultCount) { @@ -225,7 +236,7 @@ bool TKqpDatashardComputeContext::PinPages(const TVector TKqpDatashardComputeContext::ValidateKeys( - const IEngineFlat::TValidationInfo& validationInfo) -{ - for (auto& validKey : validationInfo.Keys) { - TKeyDesc * key = validKey.Key.get(); - - bool valid = EngineHost.IsValidKey(*key); - - if (valid) { - auto curSchemaVersion = EngineHost.GetTableSchemaVersion(key->TableId); - if (key->TableId.SchemaVersion && curSchemaVersion && curSchemaVersion != key->TableId.SchemaVersion) { - auto error = TStringBuilder() - << "Schema version missmatch for table id: " << key->TableId - << " mkql compiled on: " << key->TableId.SchemaVersion - << " current version: " << curSchemaVersion; - return {IEngineFlat::EResult::SchemeChanged, std::move(error)}; - } - } else { - switch (key->Status) { - case TKeyDesc::EStatus::SnapshotNotExist: - return {IEngineFlat::EResult::SnapshotNotExist, ""}; - case TKeyDesc::EStatus::SnapshotNotReady: - key->Status = TKeyDesc::EStatus::Ok; - return {IEngineFlat::EResult::SnapshotNotReady, ""}; - default: - auto error = TStringBuilder() - << "Validate (" << __LINE__ << "): Key validation status: " << (ui32)key->Status; - return {IEngineFlat::EResult::KeyError, std::move(error)}; - } - } - } - - return {IEngineFlat::EResult::Ok, ""}; -} - static void BuildRowImpl(const TDbTupleRef& dbTuple, const THolderFactory& holderFactory, const TSmallVec& systemColumnTags, ui64 shardId, NUdf::TUnboxedValue& result, size_t& rowSize) { @@ -400,10 +376,10 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRefSelect(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion(), - EngineHost.GetReadTxMap(tableId), - EngineHost.GetReadTxObserver(tableId)); + UserDb.GetReadTxMap(tableId), + UserDb.GetReadTxObserver(tableId)); if (InconsistentReads) { return false; @@ -458,8 +434,8 @@ TAutoPtr TKqpDatashardComputeContext::CreateIterator(const TTa TouchTableRange(tableId, range); return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion(), - EngineHost.GetReadTxMap(tableId), - EngineHost.GetReadTxObserver(tableId)); + UserDb.GetReadTxMap(tableId), + UserDb.GetReadTxObserver(tableId)); } TAutoPtr TKqpDatashardComputeContext::CreateReverseIterator(const TTableId& tableId, @@ -481,8 +457,8 @@ TAutoPtr TKqpDatashardComputeContext::CreateReverseIter TouchTableRange(tableId, range); return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion(), - EngineHost.GetReadTxMap(tableId), - EngineHost.GetReadTxObserver(tableId)); + UserDb.GetReadTxMap(tableId), + UserDb.GetReadTxObserver(tableId)); } template @@ -617,5 +593,12 @@ bool TKqpDatashardComputeContext::ReadRowWide(const TTableId& tableId, NTable::T return ReadRowWideImpl(tableId, iterator, systemColumnTags, skipNullKeys, result, stats); } +bool TKqpDatashardComputeContext::HasVolatileReadDependencies() const { + return !UserDb.GetVolatileReadDependencies().empty(); +} +const absl::flat_hash_set& TKqpDatashardComputeContext::GetVolatileReadDependencies() const { + return UserDb.GetVolatileReadDependencies(); +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index 03943e72e112..872b73e0ff80 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -10,6 +10,7 @@ namespace NKikimr { namespace NDataShard { class TExecuteKqpScanTxUnit; class TDataShard; + class TDataShardUserDb; } } @@ -22,10 +23,7 @@ using TKqpTableStats = TEngineHostCounters; class TKqpDatashardComputeContext : public TKqpComputeContextBase { public: - TKqpDatashardComputeContext(NDataShard::TDataShard* shard, TEngineHost& engineHost, TInstant now) - : Shard(shard) - , EngineHost(engineHost) - , Now(now) {} + TKqpDatashardComputeContext(NDataShard::TDataShard* shard, NDataShard::TDataShardUserDb& userDb, bool disableByKeyFilter); ui64 GetLocalTableId(const TTableId& tableId) const; TString GetTablePath(const TTableId& tableId) const; @@ -51,9 +49,7 @@ class TKqpDatashardComputeContext : public TKqpComputeContextBase { TRowVersion GetReadVersion() const; TEngineHostCounters& GetTaskCounters(ui64 taskId) { return TaskCounters[taskId]; } - TEngineHostCounters& GetDatashardCounters() { return EngineHost.GetCounters(); } - - std::pair ValidateKeys(const IEngineFlat::TValidationInfo& validationInfo); + TEngineHostCounters& GetDatashardCounters(); bool IsTabletNotReady() const { return TabletNotReady; } @@ -86,10 +82,8 @@ class TKqpDatashardComputeContext : public TKqpComputeContextBase { bool HadInconsistentReads() const { return InconsistentReads; } void SetInconsistentReads() { InconsistentReads = true; } - bool HasVolatileReadDependencies() const { return !VolatileReadDependencies.empty(); } - const absl::flat_hash_set& GetVolatileReadDependencies() const { return VolatileReadDependencies; } - void AddVolatileReadDependency(ui64 txId) { VolatileReadDependencies.insert(txId); } - + bool HasVolatileReadDependencies() const; + const absl::flat_hash_set& GetVolatileReadDependencies() const; private: void TouchTableRange(const TTableId& tableId, const TTableRange& range) const; void TouchTablePoint(const TTableId& tableId, const TArrayRef& key) const; @@ -112,16 +106,12 @@ class TKqpDatashardComputeContext : public TKqpComputeContextBase { private: NDataShard::TDataShard* Shard; std::unordered_map TaskCounters; - TEngineHost& EngineHost; - TInstant Now; - ui64 LockTxId = 0; - ui32 LockNodeId = 0; + NDataShard::TDataShardUserDb& UserDb; + bool DisableByKeyFilter; bool PersistentChannels = false; bool TabletNotReady = false; bool InconsistentReads = false; - TRowVersion ReadVersion = TRowVersion::Min(); THashMap, TActorId> OutputChannels; - absl::flat_hash_set VolatileReadDependencies; }; class TKqpDatashardApplyContext : public NUdf::IApplyContext { diff --git a/ydb/core/tx/datashard/datashard_user_db.cpp b/ydb/core/tx/datashard/datashard_user_db.cpp index c79ba81a64fe..59edf2e6699d 100644 --- a/ydb/core/tx/datashard/datashard_user_db.cpp +++ b/ydb/core/tx/datashard/datashard_user_db.cpp @@ -2,6 +2,20 @@ namespace NKikimr::NDataShard { +TDataShardUserDb::TDataShardUserDb(TDataShard& self, NTable::TDatabase& db, ui64 globalTxId, const TRowVersion& readVersion, const TRowVersion& writeVersion, NMiniKQL::TEngineHostCounters& counters, TInstant now) + : Self(self) + , Db(db) + , ChangeGroupProvider(self, db) + , GlobalTxId(globalTxId) + , LockTxId(0) + , LockNodeId(0) + , ReadVersion(readVersion) + , WriteVersion(writeVersion) + , Now(now) + , Counters(counters) +{ +} + NTable::EReady TDataShardUserDb::SelectRow( const TTableId& tableId, TArrayRef key, @@ -15,8 +29,8 @@ NTable::EReady TDataShardUserDb::SelectRow( return Db.Select(tid, key, tags, row, stats, /* readFlags */ 0, readVersion.GetOrElse(ReadVersion), - GetReadTxMap(), - GetReadTxObserver()); + GetReadTxMap(tableId), + GetReadTxObserver(tableId)); } NTable::EReady TDataShardUserDb::SelectRow( @@ -30,78 +44,679 @@ NTable::EReady TDataShardUserDb::SelectRow( return SelectRow(tableId, key, tags, row, stats, readVersion); } -void TDataShardUserDb::AddCommitTxId(ui64 txId, const TRowVersion& commitVersion) { - if (!DynamicTxMap) { - DynamicTxMap = new NTable::TDynamicTransactionMap(Self.GetVolatileTxManager().GetTxMap()); - TxMap = DynamicTxMap; +ui64 CalculateKeyBytes(const TArrayRef key) { + return std::accumulate(key.begin(), key.end(), 0, [](ui64 bytes, const TRawTypeValue& value) { return bytes + value.IsEmpty() ? 1 : value.Size(); }); +}; + +ui64 CalculateValueBytes(const TArrayRef ops) { + return std::accumulate(ops.begin(), ops.end(), 0, [](ui64 bytes, const NIceDb::TUpdateOp& op) { return bytes + op.Value.IsEmpty() ? 1 : op.Value.Size(); }); +}; + +void TDataShardUserDb::UpdateRow( + const TTableId& tableId, + const TArrayRef key, + const TArrayRef ops) +{ + auto localTableId = Self.GetLocalTableId(tableId); + Y_ABORT_UNLESS(localTableId != 0, "Unexpected UpdateRow for an unknown table"); + + ui64 valueBytes; + + // apply special columns if declared + TUserTable::TSpecialUpdate specUpdates = Self.SpecialUpdates(Db, tableId); + if (specUpdates.HasUpdates) { + const NTable::TScheme& scheme = Db.GetScheme(); + const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(localTableId); + + TStackVec extendedOps; + extendedOps.reserve(ops.size() + 3); + for (const NIceDb::TUpdateOp& op : ops) { + if (op.Tag == specUpdates.ColIdTablet) + specUpdates.ColIdTablet = Max(); + else if (op.Tag == specUpdates.ColIdEpoch) + specUpdates.ColIdEpoch = Max(); + else if (op.Tag == specUpdates.ColIdUpdateNo) + specUpdates.ColIdUpdateNo = Max(); + + extendedOps.push_back(op); + } + + auto addExtendedOp = [&scheme, &tableInfo, &extendedOps](const ui64 columnTag, const ui64& columnValue) { + const NScheme::TTypeInfo vtype = scheme.GetColumnInfo(tableInfo, columnTag)->PType; + const char* ptr = static_cast(static_cast(&columnValue)); + TRawTypeValue rawTypeValue(ptr, sizeof(ui64), vtype); + NIceDb::TUpdateOp extOp(columnTag, NTable::ECellOp::Set, rawTypeValue); + extendedOps.emplace_back(extOp); + }; + + if (specUpdates.ColIdTablet != Max()) { + addExtendedOp(specUpdates.ColIdTablet, specUpdates.Tablet); + } + + if (specUpdates.ColIdEpoch != Max()) { + addExtendedOp(specUpdates.ColIdEpoch, specUpdates.Epoch); + } + + if (specUpdates.ColIdUpdateNo != Max()) { + addExtendedOp(specUpdates.ColIdUpdateNo, specUpdates.UpdateNo); + } + UpdateRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, extendedOps); + + valueBytes = CalculateValueBytes(extendedOps); + } else { + UpdateRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, ops); + + valueBytes = CalculateValueBytes(ops); } - DynamicTxMap->Add(txId, commitVersion); + + ui64 keyBytes = CalculateKeyBytes(key); + + Counters.NUpdateRow++; + Counters.UpdateRowBytes += keyBytes + valueBytes; } -NTable::ITransactionMapPtr& TDataShardUserDb::GetReadTxMap() { - if (!TxMap) { - auto baseTxMap = Self.GetVolatileTxManager().GetTxMap(); - if (baseTxMap) { - DynamicTxMap = new NTable::TDynamicTransactionMap(baseTxMap); - TxMap = DynamicTxMap; +void TDataShardUserDb::EraseRow( + const TTableId& tableId, + const TArrayRef key) +{ + auto localTableId = Self.GetLocalTableId(tableId); + Y_ABORT_UNLESS(localTableId != 0, "Unexpected UpdateRow for an unknown table"); + + UpdateRowInt(NTable::ERowOp::Erase, tableId, localTableId, key, {}); + + ui64 keyBytes = CalculateKeyBytes(key); + + Counters.NEraseRow++; + Counters.EraseRowBytes += keyBytes + 8; +} + +void TDataShardUserDb::UpdateRowInt( + NTable::ERowOp rowOp, + const TTableId& tableId, + ui64 localTableId, + const TArrayRef key, + const TArrayRef ops) +{ + TSmallVec keyCells = ConvertTableKeys(key); + + CheckWriteConflicts(tableId, keyCells); + + if (LockTxId) { + Self.SysLocksTable().SetWriteLock(tableId, keyCells); + } else { + Self.SysLocksTable().BreakLocks(tableId, keyCells); + } + Self.SetTableUpdateTime(tableId, Now); + + auto* collector = GetChangeCollector(tableId); + + const ui64 writeTxId = GetWriteTxId(tableId); + if (writeTxId == 0) { + if (collector && !collector->OnUpdate(tableId, localTableId, rowOp, key, ops, WriteVersion)) + throw TNotReadyTabletException(); + + Db.Update(localTableId, rowOp, key, ops, WriteVersion); + } else { + if (collector && !collector->OnUpdateTx(tableId, localTableId, rowOp, key, ops, writeTxId)) + throw TNotReadyTabletException(); + + Db.UpdateTx(localTableId, rowOp, key, ops, writeTxId); + } + + if (VolatileTxId) { + Self.GetConflictsCache().GetTableCache(localTableId).AddUncommittedWrite(keyCells, VolatileTxId, Db); + } else if (LockTxId) { + Self.GetConflictsCache().GetTableCache(localTableId).AddUncommittedWrite(keyCells, LockTxId, Db); + } else { + Self.GetConflictsCache().GetTableCache(localTableId).RemoveUncommittedWrites(keyCells, Db); + } + + Self.GetKeyAccessSampler()->AddSample(tableId, keyCells); +} + +TSmallVec TDataShardUserDb::ConvertTableKeys(const TArrayRef key) +{ + TSmallVec keyCells; + keyCells.reserve(key.size()); + std::transform(key.begin(), key.end(), std::back_inserter(keyCells), [](const TRawTypeValue& x) { return TCell(&x); }); + return keyCells; +} + +IDataShardChangeCollector* TDataShardUserDb::GetChangeCollector(const TTableId& tableId) { + auto it = ChangeCollectors.find(tableId.PathId); + if (it != ChangeCollectors.end()) { + return it->second.Get(); + } + + it = ChangeCollectors.emplace(tableId.PathId, nullptr).first; + if (!Self.IsUserTable(tableId)) { + return it->second.Get(); + } + + it->second.Reset(CreateChangeCollector( + Self, + *const_cast(this), + *const_cast(this), + Db, + tableId.PathId.LocalPathId + )); + return it->second.Get(); +} + +TVector TDataShardUserDb::GetCollectedChanges() const { + TVector total; + + for (auto& [_, collector] : ChangeCollectors) { + if (!collector) { + continue; } + + auto collected = std::move(collector->GetCollected()); + std::move(collected.begin(), collected.end(), std::back_inserter(total)); } - return TxMap; + + return total; +} + +void TDataShardUserDb::ResetCollectedChanges() { + for (auto& pr : ChangeCollectors) { + if (pr.second) { + pr.second->OnRestart(); + } + } +} + +std::optional TDataShardUserDb::GetCurrentChangeGroup() const { + return ChangeGroupProvider.GetCurrentChangeGroup(); } -class TDataShardUserDb::TReadTxObserver : public NTable::ITransactionObserver { +ui64 TDataShardUserDb::GetChangeGroup() { + // Distributed transactions have their group set to zero + if (!IsImmediateTx) + return 0; + + return ChangeGroupProvider.GetChangeGroup(); +} + +void TDataShardUserDb::CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) { + auto localTid = Self.GetLocalTableId(tableId); + Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << Self.TabletID()); + + if (!Db.HasOpenTx(localTid, lockId)) { + return; + } + + if (auto lock = Self.SysLocksTable().GetRawLock(lockId, TRowVersion::Min()); lock && !VolatileCommitOrdered) { + lock->ForAllVolatileDependencies([this](ui64 txId) { + auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId); + if (info && info->State != EVolatileTxState::Aborting) { + if (VolatileDependencies.insert(txId).second && !VolatileTxId) { + SetVolatileTxId(GlobalTxId); + } + } + }); + } + + if (VolatileTxId) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Scheduling commit of lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self.TabletID()); + if (VolatileCommitTxIds.insert(lockId).second) { + // Update TxMap to include the new commit + auto it = TxMaps.find(tableId.PathId); + if (it != TxMaps.end()) { + it->second->Add(lockId, WriteVersion); + } + } + return; + } + + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self.TabletID()); + Db.CommitTx(localTid, lockId, writeVersion); + Self.GetConflictsCache().GetTableCache(localTid).RemoveUncommittedWrites(lockId, Db); + + if (!CommittedLockChanges.contains(lockId) && Self.HasLockChangeRecords(lockId)) { + if (auto* collector = GetChangeCollector(tableId)) { + collector->CommitLockChanges(lockId, WriteVersion); + CommittedLockChanges.insert(lockId); + } + } +} + +void TDataShardUserDb::AddCommitTxId(const TTableId& tableId, ui64 txId, const TRowVersion& commitVersion) { + auto* dynamicTxMap = static_cast(GetReadTxMap(tableId).Get()); + dynamicTxMap->Add(txId, commitVersion); +} + +class TLockedReadTxObserver: public NTable::ITransactionObserver { +public: + TLockedReadTxObserver(IDataShardConflictChecker& conflictChecker) + : ConflictChecker(conflictChecker) + { + } + + void OnSkipUncommitted(ui64 txId) override { + ConflictChecker.AddReadConflict(txId); + } + + void OnSkipCommitted(const TRowVersion&) override { + // We already use InvisibleRowSkips for these + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // We already use InvisibleRowSkips for these + } + + void OnApplyCommitted(const TRowVersion& rowVersion) override { + ConflictChecker.CheckReadConflict(rowVersion); + } + + void OnApplyCommitted(const TRowVersion& rowVersion, ui64 txId) override { + ConflictChecker.CheckReadConflict(rowVersion); + ConflictChecker.CheckReadDependency(txId); + } + +private: + IDataShardConflictChecker& ConflictChecker; +}; + +class TReadTxObserver: public NTable::ITransactionObserver { public: - TReadTxObserver(TDataShardUserDb& userDb) - : UserDb(userDb) - { } + TReadTxObserver(IDataShardConflictChecker& conflictChecker) + : ConflictChecker(conflictChecker) + { + } void OnSkipUncommitted(ui64) override { - // nothing + // We don't care about uncommitted changes + // Any future commit is supposed to be above our read version } void OnSkipCommitted(const TRowVersion&) override { - // nothing + // We already use InvisibleRowSkips for these } void OnSkipCommitted(const TRowVersion&, ui64) override { - // nothing + // We already use InvisibleRowSkips for these } void OnApplyCommitted(const TRowVersion&) override { - // nothing + // Not needed } void OnApplyCommitted(const TRowVersion&, ui64 txId) override { - UserDb.CheckReadDependency(txId); + ConflictChecker.CheckReadDependency(txId); } private: - TDataShardUserDb& UserDb; + IDataShardConflictChecker& ConflictChecker; }; -NTable::ITransactionObserverPtr& TDataShardUserDb::GetReadTxObserver() { - if (!TxObserver) { - auto baseTxMap = Self.GetVolatileTxManager().GetTxMap(); - if (baseTxMap) { - TxObserver = new TReadTxObserver(*this); - } - } - return TxObserver; -} - void TDataShardUserDb::CheckReadDependency(ui64 txId) { if (auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId)) { switch (info->State) { case EVolatileTxState::Waiting: + // We are reading undecided changes and need to wait until they are resolved VolatileReadDependencies.insert(info->TxId); break; case EVolatileTxState::Committed: + // Committed changes are immediately visible and don't need a dependency break; case EVolatileTxState::Aborting: + // We just read something that we know is aborting, we would have to retry later VolatileReadDependencies.insert(info->TxId); break; } } } +class TLockedWriteTxObserver: public NTable::ITransactionObserver { +public: + TLockedWriteTxObserver(IDataShardConflictChecker& conflictChecker, NTable::TDatabase& db, ui64 txId, ui64& skipCount, ui32 localTableId) + : ConflictChecker(conflictChecker) + , Db(db) + , SelfTxId(txId) + , SkipCount(skipCount) + , LocalTid(localTableId) + { + } + + void OnSkipUncommitted(ui64 txId) override { + // Note: all active volatile transactions will be uncommitted + // without a tx map, and will be handled by AddWriteConflict. + if (!Db.HasRemovedTx(LocalTid, txId)) { + ++SkipCount; + if (!SelfFound) { + if (txId != SelfTxId) { + ConflictChecker.AddWriteConflict(txId); + } else { + SelfFound = true; + } + } + } + } + + void OnSkipCommitted(const TRowVersion&) override { + // nothing + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&, ui64) override { + // nothing + } + +private: + IDataShardConflictChecker& ConflictChecker; + NTable::TDatabase& Db; + const ui64 SelfTxId; + ui64& SkipCount; + const ui32 LocalTid; + bool SelfFound = false; +}; + +class TWriteTxObserver: public NTable::ITransactionObserver { +public: + TWriteTxObserver(IDataShardConflictChecker& conflictChecker) + : ConflictChecker(conflictChecker) + { + } + + void OnSkipUncommitted(ui64 txId) override { + // Note: all active volatile transactions will be uncommitted + // without a tx map, and will be handled by BreakWriteConflict. + ConflictChecker.BreakWriteConflict(txId); + } + + void OnSkipCommitted(const TRowVersion&) override { + // nothing + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&, ui64) override { + // nothing + } + +private: + IDataShardConflictChecker& ConflictChecker; +}; + +void TDataShardUserDb::CheckWriteConflicts(const TTableId& tableId, TArrayRef keyCells) { + auto localTableId = Self.GetLocalTableId(tableId); + Y_ABORT_UNLESS(localTableId != 0, "Unexpected CheckWriteConflicts for an unknown table"); + + // When there are uncommitted changes (write locks) we must find which + // locks would break upon commit. + bool mustFindConflicts = Self.SysLocksTable().HasWriteLocks(tableId); + + // When there are volatile changes (tx map) we try to find precise + // dependencies, but we may switch to total order on page faults. + const bool tryFindConflicts = mustFindConflicts || + (!VolatileCommitOrdered && Self.GetVolatileTxManager().GetTxMap()); + + if (!tryFindConflicts) { + // We don't need to find conflicts + return; + } + + ui64 skipCount = 0; + + NTable::ITransactionObserverPtr txObserver; + if (LockTxId) { + // We cannot use cached conflicts since we need to find skip count + txObserver = new TLockedWriteTxObserver(*this, Db, LockTxId, skipCount, localTableId); + // Locked writes are immediate, increased latency is not critical + mustFindConflicts = true; + } else if (auto* cached = Self.GetConflictsCache().GetTableCache(localTableId).FindUncommittedWrites(keyCells)) { + for (ui64 txId : *cached) { + BreakWriteConflict(txId); + } + return; + } else { + txObserver = new TWriteTxObserver(*this); + // Prefer precise conflicts for non-distributed transactions + if (IsImmediateTx) { + mustFindConflicts = true; + } + } + + // We are not actually interested in the row version, we only need to + // detect uncommitted transaction skips on the path to that version. + auto res = Db.SelectRowVersion( + localTableId, keyCells, /* readFlags */ 0, + nullptr, txObserver + ); + + if (res.Ready == NTable::EReady::Page) { + if (mustFindConflicts || LockTxId) { + // We must gather all conflicts + throw TNotReadyTabletException(); + } + + // Upgrade to volatile ordered commit and ignore the page fault + if (!VolatileCommitOrdered) { + if (!VolatileTxId) { + SetVolatileTxId(GlobalTxId); + } + VolatileCommitOrdered = true; + VolatileDependencies.clear(); + } + return; + } + + if (LockTxId || VolatileTxId) { + ui64 skipLimit = Self.GetMaxLockedWritesPerKey(); + if (skipLimit > 0 && skipCount >= skipLimit) { + throw TLockedWriteLimitException(); + } + } +} + +void TDataShardUserDb::AddWriteConflict(ui64 txId) { + if (auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId)) { + if (info->State != EVolatileTxState::Aborting) { + Self.SysLocksTable().AddVolatileDependency(info->TxId); + } + } else { + Self.SysLocksTable().AddWriteConflict(txId); + } +} + +void TDataShardUserDb::BreakWriteConflict(ui64 txId) { + if (VolatileCommitTxIds.contains(txId)) { + // Skip our own commits + } else if (auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId)) { + // We must not overwrite uncommitted changes that may become committed + // later, so we need to add a dependency that will force us to wait + // until it is persistently committed. We may ignore aborting changes + // even though they may not be persistent yet, since this tx will + // also perform writes, and either it fails, or future generation + // could not have possibly committed it already. + if (info->State != EVolatileTxState::Aborting && !VolatileCommitOrdered) { + if (!VolatileTxId) { + // All further writes will use this VolatileTxId and will + // add it to VolatileCommitTxIds, forcing it to be committed + // like a volatile transaction. Note that this does not make + // it into a real volatile transaction, it works as usual in + // every sense, only persistent commit order is affected by + // a dependency below. + SetVolatileTxId(GlobalTxId); + } + VolatileDependencies.insert(info->TxId); + } + } else { + // Break uncommitted locks + Self.SysLocksTable().BreakLock(txId); + } +} + +absl::flat_hash_set& TDataShardUserDb::GetVolatileReadDependencies() { + return VolatileReadDependencies; +} + +TVector TDataShardUserDb::GetVolatileCommitTxIds() const { + TVector commitTxIds; + + if (!VolatileCommitTxIds.empty()) { + commitTxIds.reserve(VolatileCommitTxIds.size()); + for (ui64 commitTxId : VolatileCommitTxIds) { + commitTxIds.push_back(commitTxId); + } + } + + return commitTxIds; +} + +ui64 TDataShardUserDb::GetWriteTxId(const TTableId& tableId) { + auto localTableId = Self.GetLocalTableId(tableId); + Y_ABORT_UNLESS(localTableId != 0, "Unexpected GetWriteTxId for an unknown table"); + + if (VolatileTxId) { + Y_ABORT_UNLESS(!LockTxId); + if (VolatileCommitTxIds.insert(VolatileTxId).second) { + // Update TxMap to include the new commit + auto it = TxMaps.find(tableId.PathId); + if (it != TxMaps.end()) { + it->second->Add(VolatileTxId, WriteVersion); + } + } + return VolatileTxId; + } + + return LockTxId; +} + +NTable::ITransactionMapPtr TDataShardUserDb::GetReadTxMap(const TTableId& tableId) { + auto localTableId = Self.GetLocalTableId(tableId); + Y_ABORT_UNLESS(localTableId != 0, "Unexpected GetReadTxMap for an unknown table"); + + auto baseTxMap = Self.GetVolatileTxManager().GetTxMap(); + + bool needTxMap = ( + // We need tx map when there are waiting volatile transactions + baseTxMap || + // We need tx map to see committed volatile tx changes + VolatileTxId && !VolatileCommitTxIds.empty() || + // We need tx map when current lock has uncommitted changes + LockTxId && Self.SysLocksTable().HasCurrentWriteLock(tableId) + ); + + if (!needTxMap) { + // We don't need tx map + return nullptr; + } + + auto& txMap = TxMaps[tableId.PathId]; + if (!txMap) { + txMap = new NTable::TDynamicTransactionMap(baseTxMap); + if (LockTxId) { + // Uncommitted changes are visible in all possible snapshots + txMap->Add(LockTxId, TRowVersion::Min()); + } else if (VolatileTxId) { + // We want committed volatile changes to be visible at the write version + for (ui64 commitTxId : VolatileCommitTxIds) { + txMap->Add(commitTxId, WriteVersion); + } + } + } + + return txMap; +} + + + +NTable::ITransactionObserverPtr TDataShardUserDb::GetReadTxObserver(const TTableId& tableId) { + auto localTableId = Self.GetLocalTableId(tableId); + Y_ABORT_UNLESS(localTableId != 0, "Unexpected GetReadTxObserver for an unknown table"); + + bool needObserver = ( + // We need observer when there are waiting changes in the tx map + Self.GetVolatileTxManager().GetTxMap() || + // We need observer for locked reads when there are active write locks + LockTxId && Self.SysLocksTable().HasWriteLocks(tableId) + ); + + if (!needObserver) { + // We don't need tx observer + return nullptr; + } + + auto& ptr = TxObservers[tableId.PathId]; + if (!ptr) { + if (LockTxId) { + ptr = new TLockedReadTxObserver(*this); + } else { + ptr = new TReadTxObserver(*this); + } + } + + return ptr; +} + +void TDataShardUserDb::AddReadConflict(ui64 txId) const { + Y_ABORT_UNLESS(LockTxId); + + // We have detected uncommitted changes in txId that could affect + // our read result. We arrange a conflict that breaks our lock + // when txId commits. + Self.SysLocksTable().AddReadConflict(txId); +} + +void TDataShardUserDb::CheckReadConflict(const TRowVersion& rowVersion) const { + Y_ABORT_UNLESS(LockTxId); + + if (rowVersion > ReadVersion) { + // We are reading from snapshot at ReadVersion and should not normally + // observe changes with a version above that. However, if we have an + // uncommitted change, that we fake as committed for our own changes + // visibility, we might shadow some change that happened after a + // snapshot. This is a clear indication of a conflict between read + // and that future conflict, hence we must break locks and abort. + Self.SysLocksTable().BreakSetLocks(); + } +} + + + +bool TDataShardUserDb::NeedToReadBeforeWrite(const TTableId& tableId) { + if (Self.GetVolatileTxManager().GetTxMap()) { + return true; + } + + if (Self.SysLocksTable().HasWriteLocks(tableId)) { + return true; + } + + if (auto* collector = GetChangeCollector(tableId)) { + if (collector->NeedToReadKeys()) { + return true; + } + } + + return false; +} + +void TDataShardUserDb::ResetCounters() { + Counters = {}; +} + +NMiniKQL::TEngineHostCounters& TDataShardUserDb::GetCounters() { + return Counters; +} + +const NMiniKQL::TEngineHostCounters& TDataShardUserDb::GetCounters() const { + return Counters; +} + } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_user_db.h b/ydb/core/tx/datashard/datashard_user_db.h index 5cd95171dfba..2bf1f72fb90d 100644 --- a/ydb/core/tx/datashard/datashard_user_db.h +++ b/ydb/core/tx/datashard/datashard_user_db.h @@ -24,16 +24,47 @@ class IDataShardUserDb { TArrayRef tags, NTable::TRowState& row, const TMaybe& readVersion = {}) = 0; + + virtual void UpdateRow( + const TTableId& tableId, + const TArrayRef key, + const TArrayRef ops) = 0; + + virtual void EraseRow( + const TTableId& tableId, + const TArrayRef key) = 0; }; -class TDataShardUserDb final : public IDataShardUserDb { +class IDataShardConflictChecker { +protected: + ~IDataShardConflictChecker() = default; public: - TDataShardUserDb(TDataShard& self, NTable::TDatabase& db, const TRowVersion& readVersion) - : Self(self) - , Db(db) - , ReadVersion(readVersion) - { } + virtual void AddReadConflict(ui64 txId) const = 0; + virtual void CheckReadConflict(const TRowVersion& rowVersion) const = 0; + virtual void CheckReadDependency(ui64 txId) = 0; + + virtual void CheckWriteConflicts(const TTableId& tableId, TArrayRef keyCells) = 0; + virtual void AddWriteConflict(ui64 txId) = 0; + virtual void BreakWriteConflict(ui64 txId) = 0; +}; +class TDataShardUserDb final + : public IDataShardUserDb + , public IDataShardChangeGroupProvider + , public IDataShardConflictChecker { +public: + TDataShardUserDb( + TDataShard& self, + NTable::TDatabase& db, + ui64 globalTxId, + const TRowVersion& readVersion, + const TRowVersion& writeVersion, + NMiniKQL::TEngineHostCounters& counters, + TInstant now + ); + +//IDataShardUserDb +public: NTable::EReady SelectRow( const TTableId& tableId, TArrayRef key, @@ -49,26 +80,86 @@ class TDataShardUserDb final : public IDataShardUserDb { NTable::TRowState& row, const TMaybe& readVersion = {}) override; - void AddCommitTxId(ui64 txId, const TRowVersion& commitVersion); + void UpdateRow( + const TTableId& tableId, + const TArrayRef key, + const TArrayRef ops) override; + + void EraseRow( + const TTableId& tableId, + const TArrayRef key) override; + +//IDataShardChangeGroupProvider +public: + std::optional GetCurrentChangeGroup() const override; + ui64 GetChangeGroup() override; - absl::flat_hash_set& GetVolatileReadDependencies() { - return VolatileReadDependencies; - } +//IDataShardConflictChecker +public: + void AddReadConflict(ui64 txId) const override; + void CheckReadConflict(const TRowVersion& rowVersion) const override; + void CheckReadDependency(ui64 txId) override; + + void CheckWriteConflicts(const TTableId& tableId, TArrayRef keyCells) override; + void AddWriteConflict(ui64 txId) override; + void BreakWriteConflict(ui64 txId) override; + +public: + IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId); + TVector GetCollectedChanges() const; + void ResetCollectedChanges(); + +public: + void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion); + bool NeedToReadBeforeWrite(const TTableId& tableId); + void AddCommitTxId(const TTableId& tableId, ui64 txId, const TRowVersion& commitVersion); + ui64 GetWriteTxId(const TTableId& tableId); + + absl::flat_hash_set& GetVolatileReadDependencies(); + TVector GetVolatileCommitTxIds() const; + + NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId); + NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId); + + void ResetCounters(); + NMiniKQL::TEngineHostCounters& GetCounters(); + const NMiniKQL::TEngineHostCounters& GetCounters() const; private: - class TReadTxObserver; - NTable::ITransactionMapPtr& GetReadTxMap(); - NTable::ITransactionObserverPtr& GetReadTxObserver(); - void CheckReadDependency(ui64 txId); + static TSmallVec ConvertTableKeys(const TArrayRef key); + + void UpdateRowInt(NTable::ERowOp rowOp, const TTableId& tableId, ui64 localTableId, const TArrayRef key, const TArrayRef ops); private: TDataShard& Self; NTable::TDatabase& Db; - TRowVersion ReadVersion; - NTable::ITransactionMapPtr TxMap; - TIntrusivePtr DynamicTxMap; - NTable::ITransactionObserverPtr TxObserver; - absl::flat_hash_set VolatileReadDependencies; + + TDataShardChangeGroupProvider ChangeGroupProvider; + + absl::flat_hash_map> ChangeCollectors; + + YDB_READONLY_DEF(ui64, GlobalTxId); + YDB_ACCESSOR_DEF(ui64, LockTxId); + YDB_ACCESSOR_DEF(ui32, LockNodeId); + YDB_ACCESSOR_DEF(ui64, VolatileTxId); + YDB_ACCESSOR_DEF(bool, IsImmediateTx); + YDB_ACCESSOR_DEF(bool, IsRepeatableSnapshot); + + YDB_ACCESSOR_DEF(TRowVersion, ReadVersion); + YDB_ACCESSOR_DEF(TRowVersion, WriteVersion); + + YDB_READONLY_DEF(TInstant, Now); + + YDB_READONLY_DEF(absl::flat_hash_set, VolatileReadDependencies); + absl::flat_hash_set CommittedLockChanges; + absl::flat_hash_map> TxMaps; + absl::flat_hash_map TxObservers; + + absl::flat_hash_set VolatileCommitTxIds; + YDB_ACCESSOR_DEF(absl::flat_hash_set, VolatileDependencies); + YDB_ACCESSOR_DEF(bool, VolatileCommitOrdered); + + NMiniKQL::TEngineHostCounters& Counters; }; } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index 1f60c13e77b6..5bd48c0a0311 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -17,12 +17,12 @@ namespace NKikimr { namespace NDataShard { -TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev) +TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite::TPtr& ev) : Ev(ev) - , EngineBay(self, txc, ctx, stepTxId.ToPair()) + , UserDb(*self, txc.DB, globalTxId, readVersion, writeVersion, EngineHostCounters, TAppData::TimeProvider->Now()) + , KeyValidator(*self, txc.DB) , TabletId(self->TabletID()) , Ctx(ctx) - , StepTxId(stepTxId) , ReceivedAt(receivedAt) , TxSize(0) , ErrCode(NKikimrTxDataShard::TError::OK) @@ -31,23 +31,25 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ComputeTxSize(); NActors::NMemory::TLabel::Add(TxSize); - if (LockTxId()) - EngineBay.SetLockTxId(LockTxId(), LockNodeId()); + if (LockTxId()) { + UserDb.SetLockTxId(LockTxId()); + UserDb.SetLockNodeId(LockNodeId()); + } if (Immediate()) - EngineBay.SetIsImmediateTx(); + UserDb.SetIsImmediateTx(true); NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; - LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId << " at " << TabletId << ", record: " << GetRecord().ShortDebugString()); + LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << GetRecord().ShortDebugString()); if (!ParseRecord(self->TableInfos)) return; SetTxKeys(RecordOperation().GetColumnIds()); - KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), EngineBay); - EngineBay.MarkTxLoaded(); + KqpSetTxLocksKeys(GetKqpLocks(), self->SysLocksTable(), KeyValidator); + KeyValidator.GetInfo().SetLoaded(); } TValidatedWriteTx::~TValidatedWriteTx() { @@ -186,7 +188,7 @@ void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NPro LOG_TRACE_S(Ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << TabletId << ", " << "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, *AppData()->TypeRegistry)); TTableRange tableRange(keyCells); - EngineBay.GetKeyValidator().AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false); + KeyValidator.AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false); } } @@ -204,8 +206,8 @@ bool TValidatedWriteTx::ReValidateKeys() { using EResult = NMiniKQL::IEngineFlat::EResult; - - auto [result, error] = GetKeyValidator().ValidateKeys(); + TKeyValidator::TValidateOptions options(UserDb); + auto [result, error] = GetKeyValidator().ValidateKeys(options); if (result != EResult::Ok) { ErrStr = std::move(error); ErrCode = ConvertErrCode(result); @@ -224,7 +226,6 @@ bool TValidatedWriteTx::CheckCancelled() { } void TValidatedWriteTx::ReleaseTxData() { - EngineBay.DestroyEngine(); IsReleased = true; NActors::NMemory::TLabel::Sub(TxSize); @@ -317,7 +318,8 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransac { if (!WriteTx) { Y_ABORT_UNLESS(Ev); - WriteTx = std::make_shared(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev); + auto [readVersion, writeVersion] = self->GetReadWriteVersions(this); + WriteTx = std::make_shared(self, txc, ctx, GetGlobalTxId(), GetReceivedAt(), readVersion, writeVersion, Ev); } return WriteTx; } @@ -379,7 +381,7 @@ void TWriteOperation::DbStoreArtifactFlags(NTable::TDatabase& txcDb) ui64 TWriteOperation::GetMemoryConsumption() const { ui64 res = 0; if (WriteTx) { - res += WriteTx->GetTxSize() + WriteTx->GetMemoryAllocated(); + res += WriteTx->GetTxSize(); } if (Ev) { res += sizeof(NEvents::TDataEvents::TEvWrite); @@ -430,7 +432,8 @@ ERestoreDataStatus TWriteOperation::RestoreTxData( LocksCache().Locks[lock.LockId] = lock; bool extractKeys = WriteTx->IsTxInfoLoaded(); - WriteTx = std::make_shared(self, txc, ctx, GetStepOrder(), GetReceivedAt(), Ev); + auto [readVersion, writeVersion] = self->GetReadWriteVersions(this); + WriteTx = std::make_shared(self, txc, ctx, GetStepOrder(), GetReceivedAt(), readVersion, writeVersion, Ev); if (WriteTx->Ready() && extractKeys) { WriteTx->ExtractKeys(); } diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index e2db234723f4..fa0bdf2fc951 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -3,6 +3,7 @@ #include "datashard_impl.h" #include "datashard_locks.h" #include "datashard__engine_host.h" +#include "datashard_user_db.h" #include "operation.h" #include @@ -18,8 +19,7 @@ class TValidatedWriteTx: TNonCopyable { public: using TPtr = std::shared_ptr; - TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, const TStepOrder& stepTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite::TPtr& ev); - + TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite::TPtr& ev); ~TValidatedWriteTx(); static constexpr ui64 MaxReorderTxKeys() { @@ -74,69 +74,57 @@ class TValidatedWriteTx: TNonCopyable { return TxInfo().DynKeysCount != 0; } - ui64 GetMemoryAllocated() const { - return EngineBay.GetEngine() ? EngineBay.GetEngine()->GetMemoryAllocated() : 0; - } - - NMiniKQL::IEngineFlat* GetEngine() { - return EngineBay.GetEngine(); - } - NMiniKQL::TEngineHost* GetEngineHost() { - return EngineBay.GetEngineHost(); - } - void DestroyEngine() { - EngineBay.DestroyEngine(); - } - TKeyValidator& GetKeyValidator() { - return EngineBay.GetKeyValidator(); + return KeyValidator; } const TKeyValidator& GetKeyValidator() const { - return EngineBay.GetKeyValidator(); + return KeyValidator; } - const NMiniKQL::TEngineHostCounters& GetCounters() { - return EngineBay.GetCounters(); + TDataShardUserDb& GetUserDb() { + return UserDb; } - void ResetCounters() { - EngineBay.ResetCounters(); + + const TDataShardUserDb& GetUserDb() const { + return UserDb; } bool CanCancel(); bool CheckCancelled(); void SetWriteVersion(TRowVersion writeVersion) { - EngineBay.SetWriteVersion(writeVersion); + UserDb.SetWriteVersion(writeVersion); } void SetReadVersion(TRowVersion readVersion) { - EngineBay.SetReadVersion(readVersion); + UserDb.SetReadVersion(readVersion); } + void SetVolatileTxId(ui64 txId) { - EngineBay.SetVolatileTxId(txId); + UserDb.SetVolatileTxId(txId); } void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) { - EngineBay.CommitChanges(tableId, lockId, writeVersion); + UserDb.CommitChanges(tableId, lockId, writeVersion); } TVector GetCollectedChanges() const { - return EngineBay.GetCollectedChanges(); + return UserDb.GetCollectedChanges(); } void ResetCollectedChanges() { - EngineBay.ResetCollectedChanges(); + UserDb.ResetCollectedChanges(); } TVector GetVolatileCommitTxIds() const { - return EngineBay.GetVolatileCommitTxIds(); + return UserDb.GetVolatileCommitTxIds(); } const absl::flat_hash_set& GetVolatileDependencies() const { - return EngineBay.GetVolatileDependencies(); + return UserDb.GetVolatileDependencies(); } - std::optional GetVolatileChangeGroup() const { - return EngineBay.GetVolatileChangeGroup(); + std::optional GetVolatileChangeGroup() { + return UserDb.GetChangeGroup(); } bool GetVolatileCommitOrdered() const { - return EngineBay.GetVolatileCommitOrdered(); + return UserDb.GetVolatileCommitOrdered(); } bool IsProposed() const { @@ -171,19 +159,20 @@ class TValidatedWriteTx: TNonCopyable { } const NMiniKQL::IEngineFlat::TValidationInfo& TxInfo() const { - return EngineBay.TxInfo(); + return KeyValidator.GetInfo(); } private: const NEvents::TDataEvents::TEvWrite::TPtr& Ev; - TEngineBay EngineBay; + TDataShardUserDb UserDb; + TKeyValidator KeyValidator; + NMiniKQL::TEngineHostCounters EngineHostCounters; const ui64 TabletId; const TActorContext& Ctx; YDB_ACCESSOR_DEF(TActorId, Source); - YDB_READONLY(TStepOrder, StepTxId, TStepOrder(0, 0)); YDB_READONLY_DEF(TTableId, TableId); YDB_READONLY_DEF(TSerializedCellMatrix, Matrix); YDB_READONLY_DEF(TInstant, ReceivedAt); diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index c832880d361f..1c1a8be5e81b 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -45,7 +45,8 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit { auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(op.Get()); if (eraseTx->HasDependents()) { - TDataShardUserDb userDb(DataShard, txc.DB, readVersion); + NMiniKQL::TEngineHostCounters engineHostCounters; + TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), readVersion, writeVersion, engineHostCounters, TAppData::TimeProvider->Now()); TDataShardChangeGroupProvider groupProvider(DataShard, txc.DB, /* distributed tx group */ 0); THolder changeCollector{CreateChangeCollector(DataShard, userDb, groupProvider, txc.DB, request.GetTableId())}; @@ -191,7 +192,7 @@ class TExecuteDistributedEraseTxUnit : public TExecutionUnit { DataShard.GetConflictsCache().GetTableCache(tableInfo.LocalTid).AddUncommittedWrite(keyCells.GetCells(), globalTxId, txc.DB); if (!commitAdded && userDb) { // Make sure we see our own changes on further iterations - userDb->AddCommitTxId(globalTxId, writeVersion); + userDb->AddCommitTxId(fullTableId, globalTxId, writeVersion); commitAdded = true; } } else { diff --git a/ydb/core/tx/datashard/key_validator.cpp b/ydb/core/tx/datashard/key_validator.cpp index 24efb64e74f8..0dd217a4a74d 100644 --- a/ydb/core/tx/datashard/key_validator.cpp +++ b/ydb/core/tx/datashard/key_validator.cpp @@ -1,7 +1,7 @@ #include "key_validator.h" #include "datashard_impl.h" #include "range_ops.h" - +#include "datashard_user_db.h" #include #include @@ -65,7 +65,34 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra Info.SetLoaded(); } -bool TKeyValidator::IsValidKey(TKeyDesc& key) const { +TKeyValidator::TValidateOptions::TValidateOptions(const TDataShardUserDb& userDb) + : LockTxId(userDb.GetLockTxId()) + , LockNodeId(userDb.GetLockNodeId()) + , IsRepeatableSnapshot(userDb.GetIsRepeatableSnapshot()) + , IsImmediateTx(userDb.GetIsImmediateTx()) +{ +} + +bool TKeyValidator::IsValidKey(TKeyDesc& key, const TValidateOptions& opt) const { + if (TSysTables::IsSystemTable(key.TableId)) + return true; + + if (opt.LockTxId) { + // Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs + if (key.RowOperation != TKeyDesc::ERowOperation::Read && + (!Self.GetEnableLockedWrites() || !opt.IsImmediateTx || !opt.IsRepeatableSnapshot || !opt.LockNodeId)) + { + key.Status = TKeyDesc::EStatus::OperationNotSupported; + return false; + } + } else if (opt.IsRepeatableSnapshot) { + // Prevent updates/erases in repeatable mvcc txs + if (key.RowOperation != TKeyDesc::ERowOperation::Read) { + key.Status = TKeyDesc::EStatus::OperationNotSupported; + return false; + } + } + ui64 localTableId = Self.GetLocalTableId(key.TableId); return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key); } @@ -84,13 +111,13 @@ ui64 TKeyValidator::GetTableSchemaVersion(const TTableId& tableId) const { } } -std::tuple TKeyValidator::ValidateKeys() const { +std::tuple TKeyValidator::ValidateKeys(const TValidateOptions& options) const { using EResult = NMiniKQL::IEngineFlat::EResult; for (const auto& validKey : Info.Keys) { TKeyDesc* key = validKey.Key.get(); - bool valid = IsValidKey(*key); + bool valid = IsValidKey(*key, options); if (valid) { auto curSchemaVersion = GetTableSchemaVersion(key->TableId); diff --git a/ydb/core/tx/datashard/key_validator.h b/ydb/core/tx/datashard/key_validator.h index b732f1ce1190..0106aa17c962 100644 --- a/ydb/core/tx/datashard/key_validator.h +++ b/ydb/core/tx/datashard/key_validator.h @@ -9,6 +9,7 @@ namespace NKikimr::NDataShard { class TDataShard; +class TDataShardUserDb; class TKeyValidator { public: @@ -22,8 +23,17 @@ class TKeyValidator { void AddReadRange(const TTableId& tableId, const TVector& columns, const TTableRange& range, const TVector& keyTypes, ui64 itemsLimit = 0, bool reverse = false); void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector& keyTypes, const TVector& columns, bool isPureEraseOp); - bool IsValidKey(TKeyDesc& key) const; - std::tuple ValidateKeys() const; + struct TValidateOptions { + ui64 LockTxId; + ui64 LockNodeId; + bool IsRepeatableSnapshot; + bool IsImmediateTx; + + TValidateOptions(const TDataShardUserDb& userDb); + }; + + bool IsValidKey(TKeyDesc& key, const TValidateOptions& options) const; + std::tuple ValidateKeys(const TValidateOptions& options) const; ui64 GetTableSchemaVersion(const TTableId& tableId) const; diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index 443442075358..c469d75b0fd0 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -48,47 +48,52 @@ class TWriteUnit : public TExecutionUnit { return; } const ui64 shadowTableId = self->GetShadowTableId(fullTableId); - const TUserTable& TableInfo_ = *self->GetUserTables().at(tableId); Y_ABORT_UNLESS(TableInfo_.LocalTid == localTableId); Y_ABORT_UNLESS(TableInfo_.ShadowTid == shadowTableId); + const NTable::TScheme& scheme = txc.DB.GetScheme(); + const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(localTableId); + auto [readVersion, writeVersion] = self->GetReadWriteVersions(writeOp); writeTx->SetReadVersion(readVersion); writeTx->SetWriteVersion(writeVersion); - TDataShardUserDb userDb(*self, txc.DB, readVersion); - TDataShardChangeGroupProvider groupProvider(*self, txc.DB); - - TVector keyCells; - TVector commands; + TSmallVec key; + TSmallVec ops; const TSerializedCellMatrix& matrix = writeTx->GetMatrix(); for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx) { - keyCells.clear(); - keyCells.reserve(TableInfo_.KeyColumnIds.size()); + key.clear(); + key.reserve(TableInfo_.KeyColumnIds.size()); for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) { const TCell& cell = matrix.GetCell(rowIdx, keyColIdx); - keyCells.emplace_back(cell); + ui32 keyCol = tableInfo->KeyColumns[keyColIdx]; + if (cell.IsNull()) { + key.emplace_back(); + } else { + NScheme::TTypeInfo vtypeInfo = scheme.GetColumnInfo(tableInfo, keyCol)->PType; + key.emplace_back(cell.Data(), cell.Size(), vtypeInfo); + } } - commands.clear(); + ops.clear(); Y_ABORT_UNLESS(matrix.GetColCount() >= TableInfo_.KeyColumnIds.size()); - commands.reserve(matrix.GetColCount() - TableInfo_.KeyColumnIds.size()); + ops.reserve(matrix.GetColCount() - TableInfo_.KeyColumnIds.size()); for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) { ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx); const TCell& cell = matrix.GetCell(rowIdx, valueColIdx); - NMiniKQL::IEngineFlatHost::TUpdateCommand command = {columnTag, TKeyDesc::EColumnOperation::Set, {}, cell}; - commands.emplace_back(std::move(command)); + NScheme::TTypeInfo vtypeInfo = scheme.GetColumnInfo(tableInfo, columnTag)->PType; + ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeInfo)); } - writeTx->GetEngineHost()->UpdateRow(fullTableId, keyCells, commands); + writeTx->GetUserDb().UpdateRow(fullTableId, key, ops); } - + self->IncCounter(COUNTER_WRITE_ROWS, matrix.GetRowCount()); self->IncCounter(COUNTER_WRITE_BYTES, matrix.GetBuffer().size());