Skip to content

Commit

Permalink
Merge 8f731a1 into fd4ca58
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Feb 22, 2024
2 parents fd4ca58 + 8f731a1 commit e0ad375
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 29 deletions.
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/datashard_dep_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,11 +673,13 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera

if (lock) {
lock->SetLastOpId(op->GetTxId());
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent()) {
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent() && !lock->IsFrozen()) {
// This lock was cached before, and since we know
// it's persistent, we know it was also frozen
// during that lock caching. Restore the frozen
// flag for this lock.
// Note: this code path is only for older shards
// which didn't persist the frozen flag.
lock->SetFrozen();
}
}
Expand Down
49 changes: 29 additions & 20 deletions ydb/core/tx/datashard/datashard_locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId)
, CreationTime(TAppData::TimeProvider->Now())
{}

TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs)
TLockInfo::TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row)
: Locker(locker)
, LockId(lockId)
, LockNodeId(lockNodeId)
, Generation(generation)
, Counter(counter)
, CreationTime(createTs)
, LockId(row.LockId)
, LockNodeId(row.LockNodeId)
, Generation(row.Generation)
, Counter(row.Counter)
, CreationTime(TInstant::MicroSeconds(row.CreateTs))
, Flags(ELockFlags(row.Flags))
, Persistent(true)
{
if (counter == Max<ui64>()) {
if (Counter == Max<ui64>()) {
BreakVersion.emplace(TRowVersion::Min());
}
}
Expand Down Expand Up @@ -145,7 +146,7 @@ void TLockInfo::OnRemoved() {
void TLockInfo::PersistLock(ILocksDb* db) {
Y_ABORT_UNLESS(!IsPersistent());
Y_ABORT_UNLESS(db, "Cannot persist lock without a db");
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds());
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds(), ui64(Flags));
Persistent = true;

PersistRanges(db);
Expand Down Expand Up @@ -298,11 +299,11 @@ void TLockInfo::CleanupConflicts() {
}
}

void TLockInfo::RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags) {
void TLockInfo::RestorePersistentRange(const ILocksDb::TLockRange& rangeRow) {
auto& range = PersistentRanges.emplace_back();
range.Id = rangeId;
range.TableId = tableId;
range.Flags = flags;
range.Id = rangeRow.RangeId;
range.TableId = rangeRow.TableId;
range.Flags = ELockRangeFlags(rangeRow.Flags);

if (!!(range.Flags & ELockRangeFlags::Read)) {
if (ReadTables.insert(range.TableId).second) {
Expand Down Expand Up @@ -334,6 +335,14 @@ void TLockInfo::RestorePersistentVolatileDependency(ui64 txId) {
VolatileDependencies.insert(txId);
}

void TLockInfo::SetFrozen(ILocksDb* db) {
Y_ABORT_UNLESS(IsPersistent());
Flags |= ELockFlags::Frozen;
if (db) {
db->PersistLockFlags(LockId, ui64(Flags));
}
}

// TTableLocks

void TTableLocks::AddShardLock(TLockInfo* lock) {
Expand Down Expand Up @@ -550,14 +559,14 @@ TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
return lock;
}

TLockInfo::TPtr TLockLocker::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) {
Y_ABORT_UNLESS(Locks.find(lockId) == Locks.end());
TLockInfo::TPtr TLockLocker::AddLock(const ILocksDb::TLockRow& row) {
Y_ABORT_UNLESS(Locks.find(row.LockId) == Locks.end());

TLockInfo::TPtr lock(new TLockInfo(this, lockId, lockNodeId, generation, counter, createTs));
TLockInfo::TPtr lock(new TLockInfo(this, row));
Y_ABORT_UNLESS(lock->IsPersistent());
Locks[lockId] = lock;
if (lockNodeId) {
PendingSubscribeLocks.emplace_back(lockId, lockNodeId);
Locks[row.LockId] = lock;
if (row.LockNodeId) {
PendingSubscribeLocks.emplace_back(row.LockId, row.LockNodeId);
}
return lock;
}
Expand Down Expand Up @@ -1171,9 +1180,9 @@ bool TSysLocks::Load(ILocksDb& db) {
Locker.Clear();

for (auto& lockRow : rows) {
TLockInfo::TPtr lock = Locker.AddLock(lockRow.LockId, lockRow.LockNodeId, lockRow.Generation, lockRow.Counter, TInstant::MicroSeconds(lockRow.CreateTs));
TLockInfo::TPtr lock = Locker.AddLock(lockRow);
for (auto& rangeRow : lockRow.Ranges) {
lock->RestorePersistentRange(rangeRow.RangeId, rangeRow.TableId, ELockRangeFlags(rangeRow.Flags));
lock->RestorePersistentRange(rangeRow);
}
}

Expand Down
35 changes: 29 additions & 6 deletions ydb/core/tx/datashard/datashard_locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ILocksDb {
// Persist adding/removing a lock info
virtual void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) = 0;
virtual void PersistLockCounter(ui64 lockId, ui64 counter) = 0;
virtual void PersistLockFlags(ui64 lockId, ui64 flags) = 0;
virtual void PersistRemoveLock(ui64 lockId) = 0;

// Persist adding/removing info on locked ranges
Expand Down Expand Up @@ -206,6 +207,23 @@ struct TPendingSubscribeLock {
}
};

// ELockFlags type safe enum

enum class ELockFlags : ui64 {
None = 0,
Frozen = 1,
};

using ELockFlagsRaw = std::underlying_type<ELockFlags>::type;

inline ELockFlags operator|(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) | ELockFlagsRaw(b)); }
inline ELockFlags operator&(ELockFlags a, ELockFlags b) { return ELockFlags(ELockFlagsRaw(a) & ELockFlagsRaw(b)); }
inline ELockFlags& operator|=(ELockFlags& a, ELockFlags b) { return a = a | b; }
inline ELockFlags& operator&=(ELockFlags& a, ELockFlags b) { return a = a & b; }
inline bool operator!(ELockFlags c) { return ELockFlagsRaw(c) == 0; }

// ELockConflictFlags type safe enum

enum class ELockConflictFlags : ui8 {
None = 0,
BreakThemOnOurCommit = 1,
Expand All @@ -220,6 +238,8 @@ inline ELockConflictFlags& operator|=(ELockConflictFlags& a, ELockConflictFlags
inline ELockConflictFlags& operator&=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a & b; }
inline bool operator!(ELockConflictFlags c) { return ELockConflictFlagsRaw(c) == 0; }

// ELockRangeFlags type safe enum

enum class ELockRangeFlags : ui8 {
None = 0,
Read = 1,
Expand Down Expand Up @@ -262,7 +282,7 @@ class TLockInfo
using TPtr = TIntrusivePtr<TLockInfo>;

TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId);
TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row);
~TLockInfo();

bool Empty() const {
Expand Down Expand Up @@ -303,6 +323,9 @@ class TLockInfo
ui32 GetLockNodeId() const { return LockNodeId; }

TInstant GetCreationTime() const { return CreationTime; }

ELockFlags GetFlags() const { return Flags; }

const THashSet<TPathId>& GetReadTables() const { return ReadTables; }
const THashSet<TPathId>& GetWriteTables() const { return WriteTables; }

Expand All @@ -320,7 +343,7 @@ class TLockInfo
void PersistConflicts(ILocksDb* db);
void CleanupConflicts();

void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags);
void RestorePersistentRange(const ILocksDb::TLockRange& rangeRow);
void RestorePersistentConflict(TLockInfo* otherLock);
void RestorePersistentVolatileDependency(ui64 txId);

Expand All @@ -341,8 +364,8 @@ class TLockInfo
ui64 GetLastOpId() const { return LastOpId; }
void SetLastOpId(ui64 opId) { LastOpId = opId; }

bool IsFrozen() const { return Frozen; }
void SetFrozen() { Frozen = true; }
bool IsFrozen() const { return !!(Flags & ELockFlags::Frozen); }
void SetFrozen(ILocksDb* db = nullptr);

private:
void MakeShardLock();
Expand All @@ -369,6 +392,7 @@ class TLockInfo
ui32 Generation;
ui64 Counter;
TInstant CreationTime;
ELockFlags Flags = ELockFlags::None;
THashSet<TPathId> ReadTables;
THashSet<TPathId> WriteTables;
TVector<TPointKey> Points;
Expand All @@ -386,7 +410,6 @@ class TLockInfo
TVector<TPersistentRange> PersistentRanges;

ui64 LastOpId = 0;
bool Frozen = false;
};

struct TTableLocksReadListTag {};
Expand Down Expand Up @@ -641,7 +664,7 @@ class TLockLocker {
void RemoveBrokenRanges();

TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId);
TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
TLockInfo::TPtr AddLock(const ILocksDb::TLockRow& row);
void RemoveOneLock(ui64 lockId, ILocksDb* db = nullptr);

void SaveBrokenPersistentLocks(ILocksDb* db);
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/datashard_locks_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) {
HasChanges_ = true;
}

void TDataShardLocksDb::PersistLockFlags(ui64 lockId, ui64 flags) {
using Schema = TDataShard::Schema;
NIceDb::TNiceDb db(DB);
db.Table<Schema::Locks>().Key(lockId).Update(
NIceDb::TUpdate<Schema::Locks::Flags>(flags));
HasChanges_ = true;
}

void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
// We remove lock changes unless it's managed by volatile tx manager
bool isVolatile = Self.GetVolatileTxManager().FindByCommitTxId(lockId);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_locks_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TDataShardLocksDb
// Persist adding/removing a lock info
void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) override;
void PersistLockCounter(ui64 lockId, ui64 counter) override;
void PersistLockFlags(ui64 lockId, ui64 flags) override;
void PersistRemoveLock(ui64 lockId) override;

// Persist adding/removing info on locked ranges
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,17 @@ namespace NKqpHelpers {
if (result.result_sets_size() == 0) {
return "<empty>";
}
return FormatResult(result.result_sets(0));
if (result.result_sets_size() == 1) {
return FormatResult(result.result_sets(0));
}
TStringBuilder sb;
for (int i = 0; i < result.result_sets_size(); ++i) {
if (i != 0) {
sb << "\n";
}
sb << FormatResult(result.result_sets(i));
}
return sb;
}

inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) {
Expand Down
Loading

0 comments on commit e0ad375

Please sign in to comment.