From 8f731a1a1c738f09810ec825eb223562086509ad Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Thu, 22 Feb 2024 09:30:21 +0000 Subject: [PATCH] Fix partial distributed commit of uncommitted changes during shard restart race --- .../tx/datashard/datashard_dep_tracker.cpp | 4 +- ydb/core/tx/datashard/datashard_locks.cpp | 49 +++--- ydb/core/tx/datashard/datashard_locks.h | 35 +++- ydb/core/tx/datashard/datashard_locks_db.cpp | 8 + ydb/core/tx/datashard/datashard_locks_db.h | 1 + .../tx/datashard/datashard_ut_common_kqp.h | 12 +- .../tx/datashard/datashard_ut_snapshot.cpp | 165 ++++++++++++++++++ .../datashard/store_and_send_out_rs_unit.cpp | 4 +- 8 files changed, 249 insertions(+), 29 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp index 34d25312795e..a5874bdf6a3b 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp +++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp @@ -673,11 +673,13 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera if (lock) { lock->SetLastOpId(op->GetTxId()); - if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent()) { + if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent() && !lock->IsFrozen()) { // This lock was cached before, and since we know // it's persistent, we know it was also frozen // during that lock caching. Restore the frozen // flag for this lock. + // Note: this code path is only for older shards + // which didn't persist the frozen flag. lock->SetFrozen(); } } diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index a7e5aecbaa5c..2708f5a8bbdd 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -44,16 +44,17 @@ TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId) , CreationTime(TAppData::TimeProvider->Now()) {} -TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) +TLockInfo::TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row) : Locker(locker) - , LockId(lockId) - , LockNodeId(lockNodeId) - , Generation(generation) - , Counter(counter) - , CreationTime(createTs) + , LockId(row.LockId) + , LockNodeId(row.LockNodeId) + , Generation(row.Generation) + , Counter(row.Counter) + , CreationTime(TInstant::MicroSeconds(row.CreateTs)) + , Flags(ELockFlags(row.Flags)) , Persistent(true) { - if (counter == Max()) { + if (Counter == Max()) { BreakVersion.emplace(TRowVersion::Min()); } } @@ -145,7 +146,7 @@ void TLockInfo::OnRemoved() { void TLockInfo::PersistLock(ILocksDb* db) { Y_ABORT_UNLESS(!IsPersistent()); Y_ABORT_UNLESS(db, "Cannot persist lock without a db"); - db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds()); + db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds(), ui64(Flags)); Persistent = true; PersistRanges(db); @@ -298,11 +299,11 @@ void TLockInfo::CleanupConflicts() { } } -void TLockInfo::RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags) { +void TLockInfo::RestorePersistentRange(const ILocksDb::TLockRange& rangeRow) { auto& range = PersistentRanges.emplace_back(); - range.Id = rangeId; - range.TableId = tableId; - range.Flags = flags; + range.Id = rangeRow.RangeId; + range.TableId = rangeRow.TableId; + range.Flags = ELockRangeFlags(rangeRow.Flags); if (!!(range.Flags & ELockRangeFlags::Read)) { if (ReadTables.insert(range.TableId).second) { @@ -334,6 +335,14 @@ void TLockInfo::RestorePersistentVolatileDependency(ui64 txId) { VolatileDependencies.insert(txId); } +void TLockInfo::SetFrozen(ILocksDb* db) { + Y_ABORT_UNLESS(IsPersistent()); + Flags |= ELockFlags::Frozen; + if (db) { + db->PersistLockFlags(LockId, ui64(Flags)); + } +} + // TTableLocks void TTableLocks::AddShardLock(TLockInfo* lock) { @@ -550,14 +559,14 @@ TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) { return lock; } -TLockInfo::TPtr TLockLocker::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) { - Y_ABORT_UNLESS(Locks.find(lockId) == Locks.end()); +TLockInfo::TPtr TLockLocker::AddLock(const ILocksDb::TLockRow& row) { + Y_ABORT_UNLESS(Locks.find(row.LockId) == Locks.end()); - TLockInfo::TPtr lock(new TLockInfo(this, lockId, lockNodeId, generation, counter, createTs)); + TLockInfo::TPtr lock(new TLockInfo(this, row)); Y_ABORT_UNLESS(lock->IsPersistent()); - Locks[lockId] = lock; - if (lockNodeId) { - PendingSubscribeLocks.emplace_back(lockId, lockNodeId); + Locks[row.LockId] = lock; + if (row.LockNodeId) { + PendingSubscribeLocks.emplace_back(row.LockId, row.LockNodeId); } return lock; } @@ -1171,9 +1180,9 @@ bool TSysLocks::Load(ILocksDb& db) { Locker.Clear(); for (auto& lockRow : rows) { - TLockInfo::TPtr lock = Locker.AddLock(lockRow.LockId, lockRow.LockNodeId, lockRow.Generation, lockRow.Counter, TInstant::MicroSeconds(lockRow.CreateTs)); + TLockInfo::TPtr lock = Locker.AddLock(lockRow); for (auto& rangeRow : lockRow.Ranges) { - lock->RestorePersistentRange(rangeRow.RangeId, rangeRow.TableId, ELockRangeFlags(rangeRow.Flags)); + lock->RestorePersistentRange(rangeRow); } } diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 2d95dc74e748..c0b0aeebd450 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -57,6 +57,7 @@ class ILocksDb { // Persist adding/removing a lock info virtual void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) = 0; virtual void PersistLockCounter(ui64 lockId, ui64 counter) = 0; + virtual void PersistLockFlags(ui64 lockId, ui64 flags) = 0; virtual void PersistRemoveLock(ui64 lockId) = 0; // Persist adding/removing info on locked ranges @@ -206,6 +207,23 @@ struct TPendingSubscribeLock { } }; +// ELockFlags type safe enum + +enum class ELockFlags : ui64 { + None = 0, + Frozen = 1, +}; + +using ELockFlagsRaw = std::underlying_type::type; + +inline ELockFlags operator|(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) | ELockFlagsRaw(b)); } +inline ELockFlags operator&(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) & ELockFlagsRaw(b)); } +inline ELockFlags& operator|=(ELockFlags& a, ELockFlags b) { return a = a | b; } +inline ELockFlags& operator&=(ELockFlags& a, ELockFlags b) { return a = a & b; } +inline bool operator!(ELockFlags c) { return ELockFlagsRaw(c) == 0; } + +// ELockConflictFlags type safe enum + enum class ELockConflictFlags : ui8 { None = 0, BreakThemOnOurCommit = 1, @@ -220,6 +238,8 @@ inline ELockConflictFlags& operator|=(ELockConflictFlags& a, ELockConflictFlags inline ELockConflictFlags& operator&=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a & b; } inline bool operator!(ELockConflictFlags c) { return ELockConflictFlagsRaw(c) == 0; } +// ELockRangeFlags type safe enum + enum class ELockRangeFlags : ui8 { None = 0, Read = 1, @@ -262,7 +282,7 @@ class TLockInfo using TPtr = TIntrusivePtr; TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId); - TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs); + TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row); ~TLockInfo(); bool Empty() const { @@ -303,6 +323,9 @@ class TLockInfo ui32 GetLockNodeId() const { return LockNodeId; } TInstant GetCreationTime() const { return CreationTime; } + + ELockFlags GetFlags() const { return Flags; } + const THashSet& GetReadTables() const { return ReadTables; } const THashSet& GetWriteTables() const { return WriteTables; } @@ -320,7 +343,7 @@ class TLockInfo void PersistConflicts(ILocksDb* db); void CleanupConflicts(); - void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags); + void RestorePersistentRange(const ILocksDb::TLockRange& rangeRow); void RestorePersistentConflict(TLockInfo* otherLock); void RestorePersistentVolatileDependency(ui64 txId); @@ -341,8 +364,8 @@ class TLockInfo ui64 GetLastOpId() const { return LastOpId; } void SetLastOpId(ui64 opId) { LastOpId = opId; } - bool IsFrozen() const { return Frozen; } - void SetFrozen() { Frozen = true; } + bool IsFrozen() const { return !!(Flags & ELockFlags::Frozen); } + void SetFrozen(ILocksDb* db = nullptr); private: void MakeShardLock(); @@ -369,6 +392,7 @@ class TLockInfo ui32 Generation; ui64 Counter; TInstant CreationTime; + ELockFlags Flags = ELockFlags::None; THashSet ReadTables; THashSet WriteTables; TVector Points; @@ -386,7 +410,6 @@ class TLockInfo TVector PersistentRanges; ui64 LastOpId = 0; - bool Frozen = false; }; struct TTableLocksReadListTag {}; @@ -641,7 +664,7 @@ class TLockLocker { void RemoveBrokenRanges(); TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId); - TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs); + TLockInfo::TPtr AddLock(const ILocksDb::TLockRow& row); void RemoveOneLock(ui64 lockId, ILocksDb* db = nullptr); void SaveBrokenPersistentLocks(ILocksDb* db); diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp index 9285acb6df44..5e5b21c173e9 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.cpp +++ b/ydb/core/tx/datashard/datashard_locks_db.cpp @@ -129,6 +129,14 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) { HasChanges_ = true; } +void TDataShardLocksDb::PersistLockFlags(ui64 lockId, ui64 flags) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table().Key(lockId).Update( + NIceDb::TUpdate(flags)); + HasChanges_ = true; +} + void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) { // We remove lock changes unless it's managed by volatile tx manager bool isVolatile = Self.GetVolatileTxManager().FindByCommitTxId(lockId); diff --git a/ydb/core/tx/datashard/datashard_locks_db.h b/ydb/core/tx/datashard/datashard_locks_db.h index 1aba7176ec94..c9da2a157624 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.h +++ b/ydb/core/tx/datashard/datashard_locks_db.h @@ -23,6 +23,7 @@ class TDataShardLocksDb // Persist adding/removing a lock info void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) override; void PersistLockCounter(ui64 lockId, ui64 counter) override; + void PersistLockFlags(ui64 lockId, ui64 flags) override; void PersistRemoveLock(ui64 lockId) override; // Persist adding/removing info on locked ranges diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 888711b79943..d7182e08e3da 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -146,7 +146,17 @@ namespace NKqpHelpers { if (result.result_sets_size() == 0) { return ""; } - return FormatResult(result.result_sets(0)); + if (result.result_sets_size() == 1) { + return FormatResult(result.result_sets(0)); + } + TStringBuilder sb; + for (int i = 0; i < result.result_sets_size(); ++i) { + if (i != 0) { + sb << "\n"; + } + sb << FormatResult(result.result_sets(i)); + } + return sb; } inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) { diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index f34a7e09942b..44e4bf7d555d 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -8,6 +8,7 @@ #include #include // Y_UNIT_TEST_(TWIN|QUAD) +#include #include @@ -3823,6 +3824,170 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { UNIT_ASSERT_C(duration <= TDuration::MilliSeconds(200), "UPSERT takes too much time: " << duration); } + Y_UNIT_TEST(UncommittedWriteRestartDuringCommit) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(100) + .SetAppConfig(app) + // Bug was with non-volatile transactions + .SetEnableDataShardVolatileTransactions(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + // Insert some initial data + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);")); + + const auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + TString sessionId, txId; + + // Start inserting a row into table-1 + Cerr << "... sending initial upsert" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30); + )")), + ""); + + // We want to block readsets + std::vector> readSets; + auto blockReadSets = runtime.AddObserver([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + readSets.emplace_back(ev.Release()); + }); + + // Start committing an additional read/write + // Note: select on table-1 flushes accumulated changes + // Note: select on table-2 ensures we have an outgoing readset + Cerr << "... sending commit request" << Endl; + auto commitFuture = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + + UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40); + )"), sessionId, txId, true /* commitTx */)); + + WaitFor(runtime, [&]{ return readSets.size() >= 2; }, "readset exchange"); + UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u); + + // We want to block local boot to make sure it stays down during rollback + std::vector> blockedLocalBoot; + auto blockLocalBoot = runtime.AddObserver([&](TEvLocal::TEvBootTablet::TPtr& ev) { + Cerr << "... blocking TEvLocal::TEvBootTablet" << Endl; + blockedLocalBoot.emplace_back(std::move(ev.Release())); + }); + + // Kill current datashard actor with TEvPoison (so it doesn't have a chance to reply) + Cerr << "... sending TEvPoison to " << shards1.at(0) << Endl; + ForwardToTablet(runtime, shards1.at(0), sender, new TEvents::TEvPoison); + + // Wait until hive tries to boot a new instance (old instance is dead by that point) + WaitFor(runtime, [&]{ return blockedLocalBoot.size() > 0; }, "blocked local boot", 3); + + // Stop blocking and resend readsets + blockReadSets.Remove(); + Cerr << "... resending readsets" << Endl; + for (auto& ev : readSets) { + runtime.Send(ev.release(), 0, true); + } + readSets.clear(); + + // Wait until commit fails with UNDETERMINED + Cerr << "... waiting for commit result" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(commitFuture))), + "ERROR: UNDETERMINED"); + + // Sleep a little to make sure everything settles + Cerr << "... sleeping for 1 second" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + // We want to detect a restarting datashard and block its progress queue + TActorId shard1actor; + std::vector> blockedProgress; + auto blockProgressQueue = runtime.AddObserver([&](TAutoPtr& ev) { + switch (ev->GetTypeRewrite()) { + case TEvTablet::TEvBoot::EventType: { + auto* msg = ev->Get(); + Cerr << "... observed TEvBoot for " << msg->TabletID << " at " << ev->GetRecipientRewrite() << Endl; + if (msg->TabletID == shards1.at(0)) { + shard1actor = ev->GetRecipientRewrite(); + } + break; + } + case EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */: { + if (shard1actor && ev->GetRecipientRewrite() == shard1actor) { + Cerr << "... blocking TEvProgressTranasction at " << ev->GetRecipientRewrite() << Endl; + blockedProgress.emplace_back(ev.Release()); + return; + } + break; + } + } + }); + + // Unblock local boot + blockLocalBoot.Remove(); + Cerr << "... unblocking local boot" << Endl; + for (auto& ev : blockedLocalBoot) { + runtime.Send(ev.release(), 0, true); + } + blockedLocalBoot.clear(); + + // Wait until a new instance starts and is blocked at progress queue handling + WaitFor(runtime, [&]{ return blockedProgress.size() > 0; }, "blocked progress", 10); + + // Sleep a little to make sure datashard subscribes to lock and handles the response + Cerr << "... sleeping for 1 second" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Unblock progress queue and resend blocked messages + Cerr << "... resending progress queue" << Endl; + blockProgressQueue.Remove(); + for (auto& ev : blockedProgress) { + runtime.Send(ev.release(), 0, true); + } + blockedProgress.clear(); + + // Sleep a little to make sure everything settles + Cerr << "... sleeping for 1 second" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Now make a read query, we must not observe partial commit + Cerr << "... checking final table state" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp index 71b429094464..e809c4209ce4 100644 --- a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp @@ -1,6 +1,7 @@ #include "datashard_impl.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" +#include "datashard_locks_db.h" namespace NKikimr { namespace NDataShard { @@ -62,7 +63,8 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op, ui64 lockId = pr.first; auto lock = DataShard.SysLocksTable().GetRawLock(lockId, TRowVersion::Min()); if (lock && lock->IsPersistent()) { - lock->SetFrozen(); + TDataShardLocksDb locksDb(DataShard, txc); + lock->SetFrozen(&locksDb); } } tx->MarkLocksStored();