Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partial distributed commit of uncommitted changes during shard restart race #2169

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/datashard_dep_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,11 +673,13 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera

if (lock) {
lock->SetLastOpId(op->GetTxId());
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent()) {
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent() && !lock->IsFrozen()) {
// This lock was cached before, and since we know
// it's persistent, we know it was also frozen
// during that lock caching. Restore the frozen
// flag for this lock.
// Note: this code path is only for older shards
// which didn't persist the frozen flag.
lock->SetFrozen();
}
}
Expand Down
49 changes: 29 additions & 20 deletions ydb/core/tx/datashard/datashard_locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId)
, CreationTime(TAppData::TimeProvider->Now())
{}

TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs)
TLockInfo::TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row)
: Locker(locker)
, LockId(lockId)
, LockNodeId(lockNodeId)
, Generation(generation)
, Counter(counter)
, CreationTime(createTs)
, LockId(row.LockId)
, LockNodeId(row.LockNodeId)
, Generation(row.Generation)
, Counter(row.Counter)
, CreationTime(TInstant::MicroSeconds(row.CreateTs))
, Flags(ELockFlags(row.Flags))
, Persistent(true)
{
if (counter == Max<ui64>()) {
if (Counter == Max<ui64>()) {
BreakVersion.emplace(TRowVersion::Min());
}
}
Expand Down Expand Up @@ -145,7 +146,7 @@ void TLockInfo::OnRemoved() {
void TLockInfo::PersistLock(ILocksDb* db) {
Y_ABORT_UNLESS(!IsPersistent());
Y_ABORT_UNLESS(db, "Cannot persist lock without a db");
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds());
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds(), ui64(Flags));
Persistent = true;

PersistRanges(db);
Expand Down Expand Up @@ -298,11 +299,11 @@ void TLockInfo::CleanupConflicts() {
}
}

void TLockInfo::RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags) {
void TLockInfo::RestorePersistentRange(const ILocksDb::TLockRange& rangeRow) {
auto& range = PersistentRanges.emplace_back();
range.Id = rangeId;
range.TableId = tableId;
range.Flags = flags;
range.Id = rangeRow.RangeId;
range.TableId = rangeRow.TableId;
range.Flags = ELockRangeFlags(rangeRow.Flags);

if (!!(range.Flags & ELockRangeFlags::Read)) {
if (ReadTables.insert(range.TableId).second) {
Expand Down Expand Up @@ -334,6 +335,14 @@ void TLockInfo::RestorePersistentVolatileDependency(ui64 txId) {
VolatileDependencies.insert(txId);
}

void TLockInfo::SetFrozen(ILocksDb* db) {
Y_ABORT_UNLESS(IsPersistent());
Flags |= ELockFlags::Frozen;
if (db) {
db->PersistLockFlags(LockId, ui64(Flags));
}
}

// TTableLocks

void TTableLocks::AddShardLock(TLockInfo* lock) {
Expand Down Expand Up @@ -550,14 +559,14 @@ TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
return lock;
}

TLockInfo::TPtr TLockLocker::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) {
Y_ABORT_UNLESS(Locks.find(lockId) == Locks.end());
TLockInfo::TPtr TLockLocker::AddLock(const ILocksDb::TLockRow& row) {
Y_ABORT_UNLESS(Locks.find(row.LockId) == Locks.end());

TLockInfo::TPtr lock(new TLockInfo(this, lockId, lockNodeId, generation, counter, createTs));
TLockInfo::TPtr lock(new TLockInfo(this, row));
Y_ABORT_UNLESS(lock->IsPersistent());
Locks[lockId] = lock;
if (lockNodeId) {
PendingSubscribeLocks.emplace_back(lockId, lockNodeId);
Locks[row.LockId] = lock;
if (row.LockNodeId) {
PendingSubscribeLocks.emplace_back(row.LockId, row.LockNodeId);
}
return lock;
}
Expand Down Expand Up @@ -1171,9 +1180,9 @@ bool TSysLocks::Load(ILocksDb& db) {
Locker.Clear();

for (auto& lockRow : rows) {
TLockInfo::TPtr lock = Locker.AddLock(lockRow.LockId, lockRow.LockNodeId, lockRow.Generation, lockRow.Counter, TInstant::MicroSeconds(lockRow.CreateTs));
TLockInfo::TPtr lock = Locker.AddLock(lockRow);
for (auto& rangeRow : lockRow.Ranges) {
lock->RestorePersistentRange(rangeRow.RangeId, rangeRow.TableId, ELockRangeFlags(rangeRow.Flags));
lock->RestorePersistentRange(rangeRow);
}
}

Expand Down
35 changes: 29 additions & 6 deletions ydb/core/tx/datashard/datashard_locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ILocksDb {
// Persist adding/removing a lock info
virtual void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) = 0;
virtual void PersistLockCounter(ui64 lockId, ui64 counter) = 0;
virtual void PersistLockFlags(ui64 lockId, ui64 flags) = 0;
virtual void PersistRemoveLock(ui64 lockId) = 0;

// Persist adding/removing info on locked ranges
Expand Down Expand Up @@ -206,6 +207,23 @@ struct TPendingSubscribeLock {
}
};

// ELockFlags type safe enum

enum class ELockFlags : ui64 {
None = 0,
Frozen = 1,
};

using ELockFlagsRaw = std::underlying_type<ELockFlags>::type;

inline ELockFlags operator|(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) | ELockFlagsRaw(b)); }
inline ELockFlags operator&(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) & ELockFlagsRaw(b)); }
inline ELockFlags& operator|=(ELockFlags& a, ELockFlags b) { return a = a | b; }
inline ELockFlags& operator&=(ELockFlags& a, ELockFlags b) { return a = a & b; }
inline bool operator!(ELockFlags c) { return ELockFlagsRaw(c) == 0; }

// ELockConflictFlags type safe enum

enum class ELockConflictFlags : ui8 {
None = 0,
BreakThemOnOurCommit = 1,
Expand All @@ -220,6 +238,8 @@ inline ELockConflictFlags& operator|=(ELockConflictFlags& a, ELockConflictFlags
inline ELockConflictFlags& operator&=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a & b; }
inline bool operator!(ELockConflictFlags c) { return ELockConflictFlagsRaw(c) == 0; }

// ELockRangeFlags type safe enum

enum class ELockRangeFlags : ui8 {
None = 0,
Read = 1,
Expand Down Expand Up @@ -262,7 +282,7 @@ class TLockInfo
using TPtr = TIntrusivePtr<TLockInfo>;

TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId);
TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row);
~TLockInfo();

bool Empty() const {
Expand Down Expand Up @@ -303,6 +323,9 @@ class TLockInfo
ui32 GetLockNodeId() const { return LockNodeId; }

TInstant GetCreationTime() const { return CreationTime; }

ELockFlags GetFlags() const { return Flags; }

const THashSet<TPathId>& GetReadTables() const { return ReadTables; }
const THashSet<TPathId>& GetWriteTables() const { return WriteTables; }

Expand All @@ -320,7 +343,7 @@ class TLockInfo
void PersistConflicts(ILocksDb* db);
void CleanupConflicts();

void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags);
void RestorePersistentRange(const ILocksDb::TLockRange& rangeRow);
void RestorePersistentConflict(TLockInfo* otherLock);
void RestorePersistentVolatileDependency(ui64 txId);

Expand All @@ -341,8 +364,8 @@ class TLockInfo
ui64 GetLastOpId() const { return LastOpId; }
void SetLastOpId(ui64 opId) { LastOpId = opId; }

bool IsFrozen() const { return Frozen; }
void SetFrozen() { Frozen = true; }
bool IsFrozen() const { return !!(Flags & ELockFlags::Frozen); }
void SetFrozen(ILocksDb* db = nullptr);

private:
void MakeShardLock();
Expand All @@ -369,6 +392,7 @@ class TLockInfo
ui32 Generation;
ui64 Counter;
TInstant CreationTime;
ELockFlags Flags = ELockFlags::None;
THashSet<TPathId> ReadTables;
THashSet<TPathId> WriteTables;
TVector<TPointKey> Points;
Expand All @@ -386,7 +410,6 @@ class TLockInfo
TVector<TPersistentRange> PersistentRanges;

ui64 LastOpId = 0;
bool Frozen = false;
};

struct TTableLocksReadListTag {};
Expand Down Expand Up @@ -641,7 +664,7 @@ class TLockLocker {
void RemoveBrokenRanges();

TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId);
TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
TLockInfo::TPtr AddLock(const ILocksDb::TLockRow& row);
void RemoveOneLock(ui64 lockId, ILocksDb* db = nullptr);

void SaveBrokenPersistentLocks(ILocksDb* db);
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/datashard_locks_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) {
HasChanges_ = true;
}

void TDataShardLocksDb::PersistLockFlags(ui64 lockId, ui64 flags) {
using Schema = TDataShard::Schema;
NIceDb::TNiceDb db(DB);
db.Table<Schema::Locks>().Key(lockId).Update(
NIceDb::TUpdate<Schema::Locks::Flags>(flags));
HasChanges_ = true;
}

void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
// We remove lock changes unless it's managed by volatile tx manager
bool isVolatile = Self.GetVolatileTxManager().FindByCommitTxId(lockId);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_locks_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TDataShardLocksDb
// Persist adding/removing a lock info
void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) override;
void PersistLockCounter(ui64 lockId, ui64 counter) override;
void PersistLockFlags(ui64 lockId, ui64 flags) override;
void PersistRemoveLock(ui64 lockId) override;

// Persist adding/removing info on locked ranges
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,17 @@ namespace NKqpHelpers {
if (result.result_sets_size() == 0) {
return "<empty>";
}
return FormatResult(result.result_sets(0));
if (result.result_sets_size() == 1) {
return FormatResult(result.result_sets(0));
}
TStringBuilder sb;
for (int i = 0; i < result.result_sets_size(); ++i) {
if (i != 0) {
sb << "\n";
}
sb << FormatResult(result.result_sets(i));
}
return sb;
}

inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) {
Expand Down
Loading
Loading