From 02a8c1de1ef2ddcb846e5aa37f01df622ae7b712 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 9 Sep 2022 17:02:29 -0700 Subject: [PATCH] [C#] Modified bit in FASTER (#743) * Add KeyHash to XxxInfo * Add SessioonID to DeleteInfo * Modified Bit and API * Minor changes Co-authored-by: Sajjad Rahnama --- cs/src/core/ClientSession/BasicContext.cs | 10 + cs/src/core/ClientSession/ClientSession.cs | 75 +++- cs/src/core/ClientSession/IFasterContext.cs | 7 + cs/src/core/ClientSession/LockableContext.cs | 32 +- .../ClientSession/LockableUnsafeContext.cs | 40 +- cs/src/core/ClientSession/UnsafeContext.cs | 45 +- cs/src/core/Index/Common/RecordInfo.cs | 58 ++- cs/src/core/Index/FASTER/FASTERImpl.cs | 115 ++++- cs/src/core/Index/Interfaces/UpdateInfo.cs | 21 + cs/test/ModifiedBitTests.cs | 412 ++++++++++++++++++ 10 files changed, 755 insertions(+), 60 deletions(-) create mode 100644 cs/test/ModifiedBitTests.cs diff --git a/cs/src/core/ClientSession/BasicContext.cs b/cs/src/core/ClientSession/BasicContext.cs index 7e496af18..406ca7de0 100644 --- a/cs/src/core/ClientSession/BasicContext.cs +++ b/cs/src/core/ClientSession/BasicContext.cs @@ -219,6 +219,16 @@ public ValueTask.DeleteAsyncResult> public ValueTask.DeleteAsyncResult> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default) => clientSession.DeleteAsync(key, userContext, serialNo, token); + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ResetModified(ref Key key) + => clientSession.ResetModified(ref key); + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal bool IsModified(Key key) + => clientSession.IsModified(ref key); + /// public void Refresh() => clientSession.Refresh(); diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index b117954c7..e9681ef83 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -659,6 +659,47 @@ public async ValueTask ReadyToCompletePendingAsync(CancellationToken token = def #region Other Operations + /// + public unsafe void ResetModified(ref Key key) + { + UnsafeResumeThread(); + try + { + OperationStatus status; + do + status = fht.InternalModifiedBitOperation(ref key, out _); + while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession)); + } + finally + { + UnsafeSuspendThread(); + } + } + /// + public unsafe void ResetModified(Key key) => ResetModified(ref key); + + /// + internal unsafe bool IsModified(ref Key key) + { + RecordInfo modifiedInfo; + UnsafeResumeThread(); + try + { + OperationStatus status; + do + status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false); + while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession)); + } + finally + { + UnsafeSuspendThread(); + } + return modifiedInfo.Modified; + } + + /// + internal unsafe bool IsModified(Key key) => IsModified(ref key); + /// /// Wait for commit of all operations completed until the current point in session. /// Does not itself issue checkpoint/commits. @@ -963,8 +1004,11 @@ public bool SingleWriter(ref Key key, ref Input input, ref Value src, ref Value => _clientSession.functions.SingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason) - => _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason) + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, out bool lockFailed) @@ -978,7 +1022,7 @@ public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Va [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool ConcurrentWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); // Note: KeyIndexes do not need notification of in-place updates because the key does not change. return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo); } @@ -1014,8 +1058,11 @@ public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Ou => _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + } #endregion InitialUpdater #region CopyUpdater @@ -1028,8 +1075,11 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + } #endregion CopyUpdater #region InPlaceUpdater @@ -1043,8 +1093,11 @@ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Ou } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status) - => _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status); + private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status) + { + recordInfo.SetDirtyAndModified(); + return _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status); + } private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out bool lockFailed, out OperationStatus status) { @@ -1084,7 +1137,7 @@ public bool SingleDeleter(ref Key key, ref Value value, ref RecordInfo recordInf [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); _clientSession.functions.PostSingleDeleter(ref key, ref deleteInfo); } @@ -1100,7 +1153,7 @@ public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recor [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); recordInfo.SetTombstone(); return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo); } diff --git a/cs/src/core/ClientSession/IFasterContext.cs b/cs/src/core/ClientSession/IFasterContext.cs index 6dc2c16ba..d1806b011 100644 --- a/cs/src/core/ClientSession/IFasterContext.cs +++ b/cs/src/core/ClientSession/IFasterContext.cs @@ -510,6 +510,13 @@ ValueTask.ReadAsyncResult> ReadAtAd /// to complete the Upsert operation. Failure to complete the operation will result in leaked allocations. ValueTask.DeleteAsyncResult> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default); + + /// + /// Reset the modified bit of a record (for in memory records) + /// + /// + void ResetModified(ref Key key); + /// /// Refresh session epoch and handle checkpointing phases. Used only /// in case of thread-affinitized sessions (async support is disabled). diff --git a/cs/src/core/ClientSession/LockableContext.cs b/cs/src/core/ClientSession/LockableContext.cs index 6eeda711b..ccfb1dbc9 100644 --- a/cs/src/core/ClientSession/LockableContext.cs +++ b/cs/src/core/ClientSession/LockableContext.cs @@ -524,6 +524,16 @@ public ValueTask.DeleteAsyncResult> public ValueTask.DeleteAsyncResult> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default) => DeleteAsync(ref key, userContext, serialNo, token); + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ResetModified(ref Key key) + => clientSession.ResetModified(ref key); + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal bool IsModified(Key key) + => clientSession.IsModified(ref key); + /// public void Refresh() { @@ -591,14 +601,17 @@ public bool SingleWriter(ref Key key, ref Input input, ref Value src, ref Value [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason) - => _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, out bool lockFailed) { // Note: KeyIndexes do not need notification of in-place updates because the key does not change. lockFailed = false; - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo); } #endregion IFunctions - Upserts @@ -615,7 +628,10 @@ public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Ou [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + } #endregion InitialUpdater #region CopyUpdater @@ -629,13 +645,17 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + } #endregion CopyUpdater #region InPlaceUpdater [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out bool lockFailed, out OperationStatus status) { + recordInfo.SetDirtyAndModified(); lockFailed = false; return _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status); } @@ -654,7 +674,7 @@ public bool SingleDeleter(ref Key key, ref Value value, ref RecordInfo recordInf [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); _clientSession.functions.PostSingleDeleter(ref key, ref deleteInfo); } @@ -662,7 +682,7 @@ public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref Delete public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo, out bool lockFailed) { lockFailed = false; - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); recordInfo.Tombstone = true; return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo); } diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs index 054f04dc6..fd64bae5b 100644 --- a/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -420,6 +420,20 @@ public ValueTask.DeleteAsyncResult> return clientSession.fht.DeleteAsync(FasterSession, clientSession.ctx, ref key, userContext, serialNo, token); } + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ResetModified(ref Key key) + => clientSession.fht.InternalModifiedBitOperation(ref key, out _); + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal bool IsModified(Key key) + { + + clientSession.fht.InternalModifiedBitOperation(ref key, out var modifiedInfo, false); + return modifiedInfo.Modified; + } + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public ValueTask.DeleteAsyncResult> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default) @@ -484,14 +498,17 @@ public bool SingleWriter(ref Key key, ref Input input, ref Value src, ref Value [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason) - => _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, out bool lockFailed) { // Note: KeyIndexes do not need notification of in-place updates because the key does not change. lockFailed = false; - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo); } #endregion IFunctions - Upserts @@ -507,8 +524,11 @@ public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Ou => _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + } #endregion InitialUpdater #region CopyUpdater @@ -521,8 +541,11 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + } #endregion CopyUpdater #region InPlaceUpdater @@ -530,6 +553,7 @@ public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, re public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out bool lockFailed, out OperationStatus status) { lockFailed = false; + recordInfo.SetDirtyAndModified(); return _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status); } @@ -547,7 +571,7 @@ public bool SingleDeleter(ref Key key, ref Value value, ref RecordInfo recordInf [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); _clientSession.functions.PostSingleDeleter(ref key, ref deleteInfo); } @@ -555,7 +579,7 @@ public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref Delete public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo, out bool lockFailed) { lockFailed = false; - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); recordInfo.Tombstone = true; return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo); } diff --git a/cs/src/core/ClientSession/UnsafeContext.cs b/cs/src/core/ClientSession/UnsafeContext.cs index 4f72968ba..444b7c8f4 100644 --- a/cs/src/core/ClientSession/UnsafeContext.cs +++ b/cs/src/core/ClientSession/UnsafeContext.cs @@ -355,6 +355,19 @@ public ValueTask.DeleteAsyncResult> public ValueTask.DeleteAsyncResult> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default) => DeleteAsync(ref key, userContext, serialNo, token); + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ResetModified(ref Key key) + => clientSession.fht.InternalModifiedBitOperation(ref key, out _); + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal bool IsModified(Key key) + { + clientSession.fht.InternalModifiedBitOperation(ref key, out var modifiedInfo, false); + return modifiedInfo.Modified; + } + /// public void Refresh() { @@ -439,7 +452,10 @@ public bool SingleWriter(ref Key key, ref Input input, ref Value src, ref Value [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason) - => _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, out bool lockFailed) @@ -453,7 +469,7 @@ public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Va [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool ConcurrentWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); // Note: KeyIndexes do not need notification of in-place updates because the key does not change. return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo); } @@ -489,8 +505,11 @@ public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Ou => _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo); + } #endregion InitialUpdater #region CopyUpdater @@ -503,8 +522,11 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) - => _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo) + { + recordInfo.SetDirtyAndModified(); + _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo); + } #endregion CopyUpdater #region InPlaceUpdater @@ -518,8 +540,11 @@ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Ou } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status) - => _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status); + private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status) + { + recordInfo.SetDirtyAndModified(); + return _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status); + } private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out bool lockFailed, out OperationStatus status) { @@ -555,7 +580,7 @@ public void RMWCompletionCallback(ref Key key, ref Input input, ref Output outpu [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); _clientSession.functions.PostSingleDeleter(ref key, ref deleteInfo); } @@ -575,7 +600,7 @@ public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recor [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo) { - recordInfo.SetDirty(); + recordInfo.SetDirtyAndModified(); recordInfo.SetTombstone(); return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo); } diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index 0fa548eae..23f31bbe8 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -11,7 +11,7 @@ namespace FASTER.core { // RecordInfo layout (64 bits total): - // [--][InNewVersion][Filler][Dirty][Tentative][Sealed] [Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] + // [-][Modified][InNewVersion][Filler][Dirty][Tentative][Sealed] [Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] // where X = exclusive lock, S = shared lock, R = readcache, A = address, - = unused [StructLayout(LayoutKind.Explicit, Size = 8)] public struct RecordInfo @@ -46,9 +46,9 @@ public struct RecordInfo const int kDirtyBitOffset = kSealedBitOffset + 1; const int kFillerBitOffset = kDirtyBitOffset + 1; const int kInNewVersionBitOffset = kFillerBitOffset + 1; + const int kModifiedBitOffset = kInNewVersionBitOffset + 1; // If these become used, start with the highest number - internal const int kUnusedBit2Offset = kInNewVersionBitOffset + 1; - internal const int kUnusedBit1Offset = kUnusedBit2Offset + 1; + internal const int kUnusedBitOffset = kModifiedBitOffset + 1; const long kTombstoneBitMask = 1L << kTombstoneBitOffset; const long kValidBitMask = 1L << kValidBitOffset; @@ -57,20 +57,21 @@ public struct RecordInfo const long kDirtyBitMask = 1L << kDirtyBitOffset; const long kFillerBitMask = 1L << kFillerBitOffset; const long kInNewVersionBitMask = 1L << kInNewVersionBitOffset; - internal const long kUnused2BitMask = 1L << kUnusedBit2Offset; - internal const long kUnused1BitMask = 1L << kUnusedBit1Offset; + const long kModifiedBitMask = 1L << kModifiedBitOffset; + internal const long kUnused1BitMask = 1L << kUnusedBitOffset; [FieldOffset(0)] private long word; - public static void WriteInfo(ref RecordInfo info, bool inNewVersion, bool tombstone, bool dirty, long previousAddress) + public static void WriteInfo(ref RecordInfo info, bool inNewVersion, bool tombstone, long previousAddress) { info.word = default; info.Tombstone = tombstone; info.SetValid(); - info.Dirty = dirty; + info.Dirty = false; info.PreviousAddress = previousAddress; info.InNewVersion = inNewVersion; + info.Modified = false; } public bool Equals(RecordInfo other) => this.word == other.word; @@ -252,6 +253,35 @@ public bool TryLockExclusiveFromShared(int spinCount = 1) return true; } + + /// + /// Reset modified bit in RecordInfo + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool ResetModifiedAtomic() => ResetModifiedAtomic(spinCount: -1); + + /// + /// Try to reset the modified bit of the RecordInfo + /// + /// Whether the modified bit was reset successfully + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool ResetModifiedAtomic(int spinCount = 1) + { + while (true) + { + long expected_word = word; + if (IsIntermediateWord(expected_word)) + return false; + if ((expected_word & kModifiedBitMask) == 0) + return true; + if (expected_word == Interlocked.CompareExchange(ref word, expected_word & (~kModifiedBitMask), expected_word)) + break; + if (spinCount > 0 && --spinCount <= 0) return false; + Thread.Yield(); + } + return true; + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void CopyLocksFrom(RecordInfo other) { @@ -379,6 +409,16 @@ public bool Dirty } } + public bool Modified + { + get => (word & kModifiedBitMask) > 0; + set + { + if (value) word |= kModifiedBitMask; + else word &= ~kModifiedBitMask; + } + } + public bool Filler { get => (word & kFillerBitMask) > 0; @@ -399,6 +439,9 @@ public bool InNewVersion } } + public void SetDirtyAndModified() => word |= (kDirtyBitMask | kModifiedBitMask); + public void SetModified() => word |= kModifiedBitMask; + public void ClearModified() => word &= (~kModifiedBitMask); public void SetDirty() => word |= kDirtyBitMask; public void SetTombstone() => word |= kTombstoneBitMask; public void SetValid() => word |= kValidBitMask; @@ -446,7 +489,6 @@ public static int GetLength() } internal bool Unused1 { get => (word & kUnused1BitMask) != 0; set => word = value ? word | kUnused1BitMask : word & ~kUnused1BitMask; } - internal bool Unused2 { get => (word & kUnused2BitMask) != 0; set => word = value ? word | kUnused2BitMask : word & ~kUnused2BitMask; } public override string ToString() => word.ToString(); } diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 525a0fdf4..d7e8c1e1d 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -463,7 +463,8 @@ internal OperationStatus InternalUpsert( SessionType = fasterSession.SessionType, Version = sessionCtx.version, SessionID = sessionCtx.sessionID, - Address = logicalAddress + Address = logicalAddress, + KeyHash = hash }; if (sessionCtx.phase == Phase.REST) @@ -594,7 +595,7 @@ internal OperationStatus InternalUpsert( if (latchDestination != LatchDestination.CreatePendingContext) { // Immutable region or new record - status = CreateNewRecordUpsert(ref key, ref input, ref value, ref output, ref pendingContext, fasterSession, sessionCtx, bucket, slot, tag, entry, + status = CreateNewRecordUpsert(ref key, ref input, ref value, ref output, hash, ref pendingContext, fasterSession, sessionCtx, bucket, slot, tag, entry, latestLogicalAddress, prevHighestReadCacheLogicalAddress, lowestReadCachePhysicalAddress, logicalAddress, unsealPhysicalAddress); if (!OperationStatusUtils.IsAppend(status)) { @@ -720,6 +721,7 @@ private LatchDestination AcquireLatchUpsert(FasterExecut /// Input to the operation /// The value to insert /// The result of IFunctions.SingleWriter + /// Hash code of key /// Information about the operation context /// The current session /// The current session context @@ -733,7 +735,7 @@ private LatchDestination AcquireLatchUpsert(FasterExecut /// The logical address of a record that ConcurrentWriter returned false for; we seal it so another operation cannot IPU it, /// transfer locks from it on success, and unseal it on failure /// The physical address of ; passed to avoid needing a virtual GetPhysicalAddress call - private OperationStatus CreateNewRecordUpsert(ref Key key, ref Input input, ref Value value, ref Output output, ref PendingContext pendingContext, FasterSession fasterSession, + private OperationStatus CreateNewRecordUpsert(ref Key key, ref Input input, ref Value value, ref Output output, long keyHash, ref PendingContext pendingContext, FasterSession fasterSession, FasterExecutionContext sessionCtx, HashBucket* bucket, int slot, ushort tag, HashBucketEntry entry, long latestLogicalAddress, long prevHighestReadCacheLogicalAddress, long lowestReadCachePhysicalAddress, long unsealLogicalAddress, long unsealPhysicalAddress) where FasterSession : IFasterSession @@ -746,7 +748,7 @@ private OperationStatus CreateNewRecordUpsert( SessionType = fasterSession.SessionType, Version = sessionCtx.version, SessionID = sessionCtx.sessionID, - Address = logicalAddress + Address = logicalAddress, + KeyHash = hash }; if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress) @@ -1147,13 +1151,13 @@ internal OperationStatus InternalRMW( bool doingCU = logicalAddress >= hlog.HeadAddress && !recordInfo.Tombstone; if (doingCU) { - status = CreateNewRecordRMW(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, + status = CreateNewRecordRMW(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, hash, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, recordInfo, tag, entry, latestLogicalAddress, prevHighestReadCacheLogicalAddress, lowestReadCachePhysicalAddress, logicalAddress, unsealPhysicalAddress, doingCU); } else { Value _temp = default; - status = CreateNewRecordRMW(ref key, ref input, ref _temp, ref output, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, + status = CreateNewRecordRMW(ref key, ref input, ref _temp, ref output, hash, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, recordInfo, tag, entry, latestLogicalAddress, prevHighestReadCacheLogicalAddress, lowestReadCachePhysicalAddress, logicalAddress, unsealPhysicalAddress, doingCU); } if (!OperationStatusUtils.IsAppend(status)) @@ -1282,6 +1286,7 @@ private LatchDestination AcquireLatchRMW(PendingContext< /// Input to the operation /// Old value /// The result of IFunctions.SingleWriter + /// Hash code of key /// Information about the operation context /// The current session /// The current session context @@ -1301,7 +1306,7 @@ private LatchDestination AcquireLatchRMW(PendingContext< /// Whether we expect to be doing a CopyUpdate /// Whether we are being called from pending path /// - private OperationStatus CreateNewRecordRMW(ref Key key, ref Input input, ref Value value, ref Output output, ref PendingContext pendingContext, FasterSession fasterSession, + private OperationStatus CreateNewRecordRMW(ref Key key, ref Input input, ref Value value, ref Output output, long keyHash, ref PendingContext pendingContext, FasterSession fasterSession, FasterExecutionContext sessionCtx, HashBucket* bucket, int slot, long logicalAddress, long physicalAddress, RecordInfo srcRecordInfo, ushort tag, HashBucketEntry entry, long latestLogicalAddress, long prevHighestReadCacheLogicalAddress, long lowestReadCachePhysicalAddress, long unsealLogicalAddress, long unsealPhysicalAddress, bool doingCU, bool fromPending = false) @@ -1316,7 +1321,8 @@ private OperationStatus CreateNewRecordRMW( { SessionType = fasterSession.SessionType, Version = sessionCtx.version, - Address = logicalAddress + SessionID = sessionCtx.sessionID, + Address = logicalAddress, + KeyHash = hash }; #region Entry latch operation @@ -1805,11 +1814,13 @@ internal OperationStatus InternalDelete( ref RecordInfo recordInfo = ref hlog.GetInfo(newPhysicalAddress); RecordInfo.WriteInfo(ref recordInfo, inNewVersion: sessionCtx.InNewVersion, - tombstone: true, dirty: true, + tombstone: true, latestLogicalAddress); recordInfo.Tentative = true; hlog.Serialize(ref key, newPhysicalAddress); + deleteInfo.SessionID = sessionCtx.sessionID; deleteInfo.Address = newLogicalAddress; + deleteInfo.KeyHash = hash; deleteInfo.RecordInfo = recordInfo; if (!fasterSession.SingleDeleter(ref key, ref hlog.GetValue(newPhysicalAddress), ref recordInfo, ref deleteInfo)) @@ -2339,7 +2350,7 @@ internal OperationStatus InternalContinuePendingRMW= hlog.BeginAddress) && !recordInfo.Tombstone, fromPending: true); @@ -2746,7 +2757,8 @@ internal OperationStatus InternalTryCopyToTail + /// if reset is true it simply resets the modified bit for the key + /// if reset is false it only checks whether the key is modified or not + /// + /// key of the record. + /// RecordInfo of the key for checkModified. + /// Operation Type, whether it is reset or check + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal OperationStatus InternalModifiedBitOperation(ref Key key, out RecordInfo modifiedInfo, bool reset = true) + { + Debug.Assert(epoch.ThisInstanceProtected()); + var bucket = default(HashBucket*); + var slot = default(int); + + var hash = comparer.GetHashCode64(ref key); + var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); + +#region Trace back for record in in-memory HybridLog + var entry = default(HashBucketEntry); + FindTag(hash, tag, ref bucket, ref slot, ref entry); + + var logicalAddress = entry.Address; + + + var physicalAddress = hlog.GetPhysicalAddress(logicalAddress); + + if (logicalAddress >= hlog.HeadAddress) + { + if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress))) + { + logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; + TraceBackForKeyMatch(ref key, + logicalAddress, + hlog.HeadAddress, + out logicalAddress, + out physicalAddress); + } + } #endregion + + OperationStatus status; + modifiedInfo = default; + if (logicalAddress >= hlog.HeadAddress) + { + ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); + if (!recordInfo.IsIntermediate(out status)) + { + if (!reset) + status = OperationStatus.SUCCESS; + else if (!recordInfo.ResetModifiedAtomic()) + return OperationStatus.RETRY_LATER; + } + if (!reset && !recordInfo.Tombstone) + modifiedInfo = recordInfo; + return status; + } + // it the record does not exist we return unmodfied + // if it is in the disk we return modified + if (logicalAddress < hlog.BeginAddress) + modifiedInfo.ResetModifiedAtomic(); + else + modifiedInfo.SetModified(); + // if it is not in the memory we return success + return OperationStatus.SUCCESS; + + } } } diff --git a/cs/src/core/Index/Interfaces/UpdateInfo.cs b/cs/src/core/Index/Interfaces/UpdateInfo.cs index c8c323072..de03a56cb 100644 --- a/cs/src/core/Index/Interfaces/UpdateInfo.cs +++ b/cs/src/core/Index/Interfaces/UpdateInfo.cs @@ -72,6 +72,11 @@ public struct UpsertInfo /// public long Address { get; internal set; } + /// + /// Hash code of key being operated on + /// + public long KeyHash { get; internal set; } + /// /// The ID of session context executing the operation /// @@ -96,6 +101,7 @@ public UpsertInfo(ref RMWInfo rmwInfo) this.Version = rmwInfo.Version; this.SessionID = rmwInfo.SessionID; this.Address = rmwInfo.Address; + this.KeyHash = rmwInfo.KeyHash; this.RecordInfo = default; this.Action = UpsertAction.Default; } @@ -152,6 +158,11 @@ public struct RMWInfo /// public long Address { get; internal set; } + /// + /// Hash code of key being operated on + /// + public long KeyHash { get; internal set; } + /// /// The ID of session context executing the operation /// @@ -203,6 +214,16 @@ public struct DeleteInfo /// public long Address { get; internal set; } + /// + /// Hash code of key being operated on + /// + public long KeyHash { get; internal set; } + + /// + /// The ID of session context executing the operation + /// + public int SessionID { get; internal set; } + /// /// The header of the record. /// diff --git a/cs/test/ModifiedBitTests.cs b/cs/test/ModifiedBitTests.cs new file mode 100644 index 000000000..9946ec36d --- /dev/null +++ b/cs/test/ModifiedBitTests.cs @@ -0,0 +1,412 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using FASTER.core; +using NUnit.Framework; +using FASTER.test.ReadCacheTests; +using System.Threading.Tasks; +using static FASTER.test.TestUtils; +using System.Diagnostics; +using FASTER.test.LockableUnsafeContext; + +namespace FASTER.test.ModifiedTests +{ + + internal class LockableUnsafeComparer : IFasterEqualityComparer + { + internal const int maxSleepMs = 1; + readonly Random rng = new(101); + + public bool Equals(ref int k1, ref int k2) => k1 == k2; + + public long GetHashCode64(ref int k) + { + if (maxSleepMs > 0) + Thread.Sleep(rng.Next(maxSleepMs)); + return Utility.GetHashCode(k); + } + } + + public enum UpdateOp { Upsert, RMW, Delete } + + [TestFixture] + class ModifiedBitTests + { + const int numRecords = 1000; + const int valueMult = 1_000_000; + + + LockableUnsafeComparer comparer; + + private FasterKV fht; + private ClientSession> session; + private IDevice log; + + [SetUp] + public void Setup() + { + log = Devices.CreateLogDevice(Path.Combine(MethodTestDir, "test.log"), deleteOnClose: false); + + + comparer = new LockableUnsafeComparer(); + fht = new FasterKV(1L << 20, new LogSettings { LogDevice = log, ObjectLogDevice = null, PageSizeBits = 12, MemorySizeBits = 22 }, comparer: comparer, disableLocking: false); + session = fht.For(new SimpleFunctions()).NewSession>(); + } + + [TearDown] + public void TearDown() + { + session?.Dispose(); + session = null; + fht?.Dispose(); + fht = null; + log?.Dispose(); + log = null; + + } + + void Populate() + { + for (int key = 0; key < numRecords; key++) + Assert.IsFalse(session.Upsert(key, key * valueMult).IsPending); + } + + static void AssertLockandModified(LockableUnsafeContext> luContext, int key, bool xlock, bool slock, bool modfied = false) + { + var (isX, isS) = luContext.IsLocked(key); + var isM = luContext.IsModified(key); + Assert.AreEqual(xlock, isX, "xlock mismatch"); + Assert.AreEqual(slock, isS > 0, "slock mismatch"); + Assert.AreEqual(modfied, isM, "modfied mismatch"); + } + + static void AssertLockandModified(LockableContext> luContext, int key, bool xlock, bool slock, bool modfied = false) + { + var (isX, isS) = luContext.IsLocked(key); + var isM = luContext.IsModified(key); + Assert.AreEqual(xlock, isX, "xlock mismatch"); + Assert.AreEqual(slock, isS > 0, "slock mismatch"); + Assert.AreEqual(modfied, isM, "modfied mismatch"); + } + + static void AssertLockandModified(ClientSession> session, int key, bool xlock, bool slock, bool modfied = false) + { + using (var luContext = session.GetLockableUnsafeContext()) + { + luContext.ResumeThread(); + var (isX, isS) = luContext.IsLocked(key); + var isM = luContext.IsModified(key); + Assert.AreEqual(xlock, isX, "xlock mismatch"); + Assert.AreEqual(slock, isS > 0, "slock mismatch"); + Assert.AreEqual(modfied, isM, "Modified mismatch"); + luContext.SuspendThread(); + } + } + + [Test] + [Category(SmokeTestCategory)] + public void LockAndNotModify() + { + Populate(); + Random r = new(100); + int key = r.Next(numRecords); + session.ResetModified(key); + + var LC = session.LockableContext; + LC.Acquire(); + AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); + + LC.Lock(key, LockType.Exclusive); + AssertLockandModified(LC, key, xlock: true, slock: false, modfied: false); + + LC.Unlock(key, LockType.Exclusive); + AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); + + LC.Lock(key, LockType.Shared); + AssertLockandModified(LC, key, xlock: false, slock: true, modfied: false); + + LC.Unlock(key, LockType.Shared); + AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); + LC.Release(); + + } + + + [Test] + [Category(SmokeTestCategory)] + public void ResetModifyForNonExistingKey() + { + Populate(); + int key = numRecords + 100; + session.ResetModified(key); + AssertLockandModified(session, key, xlock: false, slock: false, modfied: false); + } + + + + [Test] + [Category(SmokeTestCategory)] + public void ModifyClientSession([Values(true, false)] bool flushToDisk, [Values] UpdateOp updateOp) + { + Populate(); + + int key = numRecords - 500; + int value = 14; + session.ResetModified(key); + AssertLockandModified(session, key, xlock: false, slock: false, modfied: false); + + if (flushToDisk) + this.fht.Log.FlushAndEvict(wait: true); + + Status status = default; + switch (updateOp) + { + case UpdateOp.Upsert: + status = session.Upsert(key, value); + break; + case UpdateOp.RMW: + status = session.RMW(key, value); + break; + case UpdateOp.Delete: + status = session.Delete(key); + break; + default: + break; + } + if (flushToDisk) + { + switch (updateOp) + { + case UpdateOp.RMW: + Assert.IsTrue(status.IsPending, status.ToString()); + session.CompletePending(wait: true); + break; + default: + Assert.IsTrue(status.NotFound); + break; + } + (status, var _) = session.Read(key); + Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); + } + + if (updateOp == UpdateOp.Delete) + AssertLockandModified(session, key, xlock: false, slock: false, modfied: false); + else + AssertLockandModified(session, key, xlock: false, slock: false, modfied: true); + + + } + + [Test] + [Category(SmokeTestCategory)] + public void ModifyLUC([Values(true, false)] bool flushToDisk, [Values] UpdateOp updateOp) + { + Populate(); + + int key = numRecords - 500; + int value = 14; + session.ResetModified(key); + using (var luContext = session.GetLockableUnsafeContext()) + { + luContext.ResumeThread(out var epoch); + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: false); + luContext.SuspendThread(); + } + + if (flushToDisk) + this.fht.Log.FlushAndEvict(wait: true); + + Status status = default; + using (var luContext = session.GetLockableUnsafeContext()) + { + luContext.ResumeThread(out var epoch); + + switch (updateOp) + { + case UpdateOp.Upsert: + status = luContext.Upsert(key, value); + break; + case UpdateOp.RMW: + status = luContext.RMW(key, value); + break; + case UpdateOp.Delete: + status = luContext.Delete(key); + break; + default: + break; + } + if (flushToDisk) + { + switch (updateOp) + { + case UpdateOp.RMW: + Assert.IsTrue(status.IsPending, status.ToString()); + luContext.CompletePending(wait: true); + break; + default: + Assert.IsTrue(status.NotFound); + break; + } + (status, var _) = luContext.Read(key); + Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); + } + if (updateOp == UpdateOp.Delete) + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: false); + else + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: true); + luContext.SuspendThread(); + } + + } + + [Test] + [Category(SmokeTestCategory)] + public void ModifyUC([Values(true, false)] bool flushToDisk, [Values] UpdateOp updateOp) + { + Populate(); + + int key = numRecords - 500; + int value = 14; + session.ResetModified(key); + AssertLockandModified(session, key, xlock: false, slock: false, modfied: false); + + if (flushToDisk) + this.fht.Log.FlushAndEvict(wait: true); + + Status status = default; + using (var unsafeContext = session.GetLockableUnsafeContext()) + { + unsafeContext.ResumeThread(out var epoch); + switch (updateOp) + { + case UpdateOp.Upsert: + status = unsafeContext.Upsert(key, value); + break; + case UpdateOp.RMW: + status = unsafeContext.RMW(key, value); + break; + case UpdateOp.Delete: + status = unsafeContext.Delete(key); + break; + default: + break; + } + if (flushToDisk) + { + switch (updateOp) + { + case UpdateOp.RMW: + Assert.IsTrue(status.IsPending, status.ToString()); + unsafeContext.CompletePending(wait: true); + break; + default: + Assert.IsTrue(status.NotFound); + break; + } + (status, var _) = unsafeContext.Read(key); + Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); + } + unsafeContext.SuspendThread(); + } + if (updateOp == UpdateOp.Delete) + AssertLockandModified(session, key, xlock: false, slock: false, modfied: false); + else + AssertLockandModified(session, key, xlock: false, slock: false, modfied: true); + } + + [Test] + [Category(SmokeTestCategory)] + public void ModifyLC([Values(true, false)] bool flushToDisk, [Values] UpdateOp updateOp) + { + Populate(); + + int key = numRecords - 500; + int value = 14; + session.ResetModified(key); + var LC = session.LockableContext; + LC.Acquire(); + AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); + + if (flushToDisk) + this.fht.Log.FlushAndEvict(wait: true); + + Status status = default; + + switch (updateOp) + { + case UpdateOp.Upsert: + status = LC.Upsert(key, value); + break; + case UpdateOp.RMW: + status = LC.RMW(key, value); + break; + case UpdateOp.Delete: + status = LC.Delete(key); + break; + default: + break; + } + if (flushToDisk) + { + switch (updateOp) + { + case UpdateOp.RMW: + Assert.IsTrue(status.IsPending, status.ToString()); + LC.CompletePending(wait: true); + break; + default: + Assert.IsTrue(status.NotFound); + break; + } + (status, var _) = LC.Read(key); + Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); + } + if (updateOp == UpdateOp.Delete) + AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); + else + AssertLockandModified(LC, key, xlock: false, slock: false, modfied: true); + LC.Release(); + } + + + [Test] + [Category(SmokeTestCategory)] + public void CopyToTailTest() + { + Populate(); + fht.Log.FlushAndEvict(wait: true); + + using (var luContext = session.GetLockableUnsafeContext()) + { + int input = 0, output = 0, key = 200; + ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail }; + + luContext.ResumeThread(); + + // Check Read Copy to Tail resets the modfied + var status = luContext.Read(ref key, ref input, ref output, ref readOptions, out _); + Assert.IsTrue(status.IsPending, status.ToString()); + luContext.CompletePending(wait: true); + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: true); + + // Check Read Copy to Tail resets the modfied on locked key + key += 10; + luContext.Lock(key, LockType.Exclusive); + status = luContext.Read(ref key, ref input, ref output, ref readOptions, out _); + Assert.IsTrue(status.IsPending, status.ToString()); + luContext.CompletePending(wait: true); + AssertLockandModified(luContext, key, xlock: true, slock: false, modfied: true); + luContext.Unlock(key, LockType.Exclusive); + + + luContext.SuspendThread(); + } + } + + } +}