diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 45f6147add02..d78e5f7126ef 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1,5 +1,6 @@ #include "datashard_impl.h" #include "datashard_txs.h" +#include "datashard_locks_db.h" #include "probes.h" #include @@ -1620,7 +1621,9 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD newTableInfo->StatsUpdateInProgress = false; newTableInfo->StatsNeedUpdate = true; - RemoveUserTable(prevId); + TDataShardLocksDb locksDb(*this, txc); + + RemoveUserTable(prevId, &locksDb); AddUserTable(newId, newTableInfo); for (auto& [_, record] : ChangesQueue) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index ba8a9484bdbe..d60e6f2ff76a 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1607,10 +1607,10 @@ class TDataShard return nullptr; } - void RemoveUserTable(const TPathId& tableId) { - TableInfos.erase(tableId.LocalPathId); - SysLocks.RemoveSchema(tableId); + void RemoveUserTable(const TPathId& tableId, ILocksDb* locksDb) { + SysLocks.RemoveSchema(tableId, locksDb); Pipeline.GetDepTracker().RemoveSchema(tableId); + TableInfos.erase(tableId.LocalPathId); } void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) { diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index 2708f5a8bbdd..4291d8c53e16 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -624,12 +624,23 @@ void TLockLocker::UpdateSchema(const TPathId& tableId, const TUserTable& tableIn table->UpdateKeyColumnsTypes(tableInfo.KeyColumnTypes); } -void TLockLocker::RemoveSchema(const TPathId& tableId) { +void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) { + // Make sure all persistent locks are removed from the database + for (auto& pr : Locks) { + if (pr.second->IsPersistent()) { + pr.second->PersistRemoveLock(db); + } + pr.second->OnRemoved(); + } + Tables.erase(tableId); Y_ABORT_UNLESS(Tables.empty()); Locks.clear(); ShardLocks.clear(); + ExpireQueue.Clear(); BrokenLocks.Clear(); + BrokenPersistentLocks.Clear(); + BrokenLocksCount_ = 0; CleanupPending.clear(); CleanupCandidates.clear(); PendingSubscribeLocks.clear(); diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index c0b0aeebd450..f12eab077eb2 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -601,7 +601,7 @@ class TLockLocker { } void UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo); - void RemoveSchema(const TPathId& tableId); + void RemoveSchema(const TPathId& tableId, ILocksDb* db); bool ForceShardLock(const TPathId& tableId) const; bool ForceShardLock(const TIntrusiveList& readTables) const; @@ -840,8 +840,8 @@ class TSysLocks { Locker.UpdateSchema(tableId, tableInfo); } - void RemoveSchema(const TPathId& tableId) { - Locker.RemoveSchema(tableId); + void RemoveSchema(const TPathId& tableId, ILocksDb* db) { + Locker.RemoveSchema(tableId, db); } TVector ApplyLocks(); diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index ea3c67b171e7..7e0ae912a3c7 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -4896,6 +4896,83 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { int32_value: 2 } items { int32_value: 20 } }"); } + Y_UNIT_TEST(UncommittedChangesRenameTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(100) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + UNIT_ASSERT_VALUES_EQUAL( + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table1` (key int, value int, PRIMARY KEY (key)); + )"), + "SUCCESS"); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table1` (key, value) VALUES (2, 22);"); + + TString sessionId = CreateSessionRPC(runtime); + TString txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, R"( + UPSERT INTO `/Root/table1` (key, value) VALUES (1, 11), (3, 33); + SELECT key, value FROM `/Root/table1` ORDER BY key; + )"), + "{ items { int32_value: 1 } items { int32_value: 11 } }, " + "{ items { int32_value: 2 } items { int32_value: 22 } }, " + "{ items { int32_value: 3 } items { int32_value: 33 } }"); + + auto shards = GetTableShards(server, sender, "/Root/table1"); + auto tableId1 = ResolveTableId(server, sender, "/Root/table1"); + + // Check shard has open transactions + { + runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId1.PathId)); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0)); + } + + WaitTxNotification(server, sender, AsyncMoveTable(server, "/Root/table1", "/Root/table1moved")); + auto tableId2 = ResolveTableId(server, sender, "/Root/table1moved"); + + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Check shard doesn't have open transactions + { + runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId)); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0)); + } + + RebootTablet(runtime, shards.at(0), sender); + + // The original table was removed + // We must not be able to commit the transaction + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1"), + "ERROR: ABORTED"); + + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Check shard doesn't have open transactions + { + runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId)); + auto ev = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0)); + } + } + } } // namespace NKikimr