From 3796074d6986b77ff07fd1c7ad03d71d98cbba33 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Fri, 23 Sep 2022 03:38:22 -0700 Subject: [PATCH 1/3] Make *UnsafeContext a struct and acquire on demand (like LockableContext); change Resume/SuspendThread naming to Begin/EndUnsafe and add Begin/EndLockable --- cs/benchmark/FasterSpanByteYcsbBenchmark.cs | 14 +- cs/benchmark/FasterYcsbBenchmark.cs | 19 +- cs/src/core/ClientSession/BasicContext.cs | 3 + cs/src/core/ClientSession/ClientSession.cs | 91 ++-- cs/src/core/ClientSession/ILockableContext.cs | 10 + cs/src/core/ClientSession/IUnsafeContext.cs | 6 +- cs/src/core/ClientSession/LockableContext.cs | 52 +-- .../ClientSession/LockableUnsafeContext.cs | 69 ++- cs/src/core/ClientSession/UnsafeContext.cs | 52 +-- cs/stress/SessionWrapper.cs | 26 +- cs/stress/TestLoader.cs | 8 +- cs/test/AdvancedLockTests.cs | 20 +- cs/test/DisposeTests.cs | 2 +- cs/test/LockableUnsafeContextTests.cs | 413 ++++++++++-------- cs/test/ModifiedBitTests.cs | 203 ++++----- cs/test/ReadCacheChainTests.cs | 17 +- cs/test/StateMachineTests.cs | 72 ++- cs/test/ThreadSession.cs | 21 +- cs/test/UnsafeContextTests.cs | 73 ++-- 19 files changed, 560 insertions(+), 611 deletions(-) diff --git a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs index ef05b9907..034862a06 100644 --- a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs +++ b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs @@ -125,8 +125,8 @@ private void RunYcsbUnsafeContext(int thread_idx) #endif var session = store.For(functions).NewSession<FunctionsSB>(); - var uContext = session.GetUnsafeContext(); - uContext.ResumeThread(); + var uContext = session.UnsafeContext; + uContext.BeginUnsafe(); try { @@ -203,10 +203,9 @@ private void RunYcsbUnsafeContext(int thread_idx) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } - uContext.Dispose(); session.Dispose(); sw.Stop(); @@ -479,8 +478,8 @@ private void SetupYcsbUnsafeContext(int thread_idx) waiter.Wait(); var session = store.For(functions).NewSession<FunctionsSB>(); - var uContext = session.GetUnsafeContext(); - uContext.ResumeThread(); + var uContext = session.UnsafeContext; + uContext.BeginUnsafe(); #if DASHBOARD var tstart = Stopwatch.GetTimestamp(); @@ -531,9 +530,8 @@ private void SetupYcsbUnsafeContext(int thread_idx) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } - uContext.Dispose(); session.Dispose(); } diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 1beffb39c..711fa2a3c 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -121,8 +121,8 @@ private void RunYcsbUnsafeContext(int thread_idx) #endif var session = store.For(functions).NewSession<Functions>(); - var uContext = session.GetUnsafeContext(); - uContext.ResumeThread(); + var uContext = session.UnsafeContext; + uContext.BeginUnsafe(); try { @@ -199,10 +199,9 @@ private void RunYcsbUnsafeContext(int thread_idx) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } - uContext.Dispose(); session.Dispose(); sw.Stop(); @@ -319,7 +318,8 @@ internal unsafe (double, double) Run(TestLoader testLoader) if (testLoader.Options.LockImpl == (int)LockImpl.Manual) { session = store.For(functions).NewSession<Functions>(); - luContext = session.GetLockableUnsafeContext(); + luContext = session.LockableUnsafeContext; + luContext.BeginLockable(); Console.WriteLine("Taking 2 manual locks"); luContext.Lock(xlock.key, xlock.kind); @@ -436,7 +436,7 @@ internal unsafe (double, double) Run(TestLoader testLoader) { luContext.Unlock(xlock.key, xlock.kind); luContext.Unlock(slock.key, slock.kind); - luContext.Dispose(); + luContext.EndLockable(); session.Dispose(); } @@ -467,8 +467,8 @@ private void SetupYcsbUnsafeContext(int thread_idx) waiter.Wait(); var session = store.For(functions).NewSession<Functions>(); - var uContext = session.GetUnsafeContext(); - uContext.ResumeThread(); + var uContext = session.UnsafeContext; + uContext.BeginUnsafe(); #if DASHBOARD var tstart = Stopwatch.GetTimestamp(); @@ -518,9 +518,8 @@ private void SetupYcsbUnsafeContext(int thread_idx) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } - uContext.Dispose(); session.Dispose(); } diff --git a/cs/src/core/ClientSession/BasicContext.cs b/cs/src/core/ClientSession/BasicContext.cs index 4e92b0b5d..1c9826e02 100644 --- a/cs/src/core/ClientSession/BasicContext.cs +++ b/cs/src/core/ClientSession/BasicContext.cs @@ -16,6 +16,9 @@ namespace FASTER.core { readonly ClientSession<Key, Value, Input, Output, Context, Functions> clientSession; + /// <summary>Indicates whether this struct has been initialized</summary> + public bool IsNull => this.clientSession is null; + internal BasicContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession) { this.clientSession = clientSession; diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index a9b413cc4..5cebbb86d 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -35,8 +36,8 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions> internal readonly InternalFasterSession FasterSession; - UnsafeContext<Key, Value, Input, Output, Context, Functions> uContext; - LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> luContext; + readonly UnsafeContext<Key, Value, Input, Output, Context, Functions> uContext; + readonly LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> luContext; readonly LockableContext<Key, Value, Input, Output, Context, Functions> lContext; readonly BasicContext<Key, Value, Input, Output, Context, Functions> bContext; @@ -49,32 +50,34 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions> internal ulong sharedLockCount; internal ulong exclusiveLockCount; - bool isAcquired; + bool isAcquiredLockable; - internal void Acquire() + internal void AcquireLockable() { - CheckNotAcquired(); + CheckIsNotAcquiredLockable(); fht.IncrementNumLockingSessions(); - isAcquired = true; + isAcquiredLockable = true; } - internal void Release() + internal void ReleaseLockable(string methodName) { - CheckAcquired(); - isAcquired = false; + CheckIsAcquiredLockable(); + if (TotalLockCount > 0) + throw new FasterException($"{methodName} called with locks held: {sharedLockCount} shared locks, {exclusiveLockCount} exclusive locks"); + isAcquiredLockable = false; fht.DecrementNumLockingSessions(); } - internal void CheckAcquired() + internal void CheckIsAcquiredLockable() { - if (!isAcquired) - throw new FasterException("Method call on not-acquired Context"); + if (!isAcquiredLockable) + throw new FasterException("Lockable method call when BeginLockable has not been called"); } - void CheckNotAcquired() + void CheckIsNotAcquiredLockable() { - if (isAcquired) - throw new FasterException("Method call on acquired Context"); + if (isAcquiredLockable) + throw new FasterException("BeginLockable cannot be called twice (call EndLockable first)"); } internal ClientSession( @@ -86,6 +89,8 @@ internal ClientSession( { this.lContext = new(this); this.bContext = new(this); + this.luContext = new(this); + this.uContext = new(this); this.loggerFactory = loggerFactory; this.logger = loggerFactory?.CreateLogger($"ClientSession-{GetHashCode():X8}"); @@ -199,22 +204,12 @@ public void Dispose() /// <summary> /// Return a new interface to Faster operations that supports manual epoch control. /// </summary> - public UnsafeContext<Key, Value, Input, Output, Context, Functions> GetUnsafeContext() - { - this.uContext ??= new(this); - this.uContext.Acquire(); - return this.uContext; - } + public UnsafeContext<Key, Value, Input, Output, Context, Functions> UnsafeContext => uContext; /// <summary> /// Return a new interface to Faster operations that supports manual locking and epoch control. /// </summary> - public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> GetLockableUnsafeContext() - { - this.luContext ??= new(this); - this.luContext.Acquire(); - return this.luContext; - } + public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> LockableUnsafeContext => luContext; /// <summary> /// Return a session wrapper that supports manual locking. @@ -655,40 +650,51 @@ public async ValueTask ReadyToCompletePendingAsync(CancellationToken token = def #region Other Operations /// <inheritdoc/> - public unsafe void ResetModified(ref Key key) + public void ResetModified(ref Key key) { UnsafeResumeThread(); try { - OperationStatus status; - do - status = fht.InternalModifiedBitOperation(ref key, out _); - while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession)); + UnsafeResetModified(ref key); } finally { UnsafeSuspendThread(); } } + + internal void UnsafeResetModified(ref Key key) + { + OperationStatus status; + do + status = fht.InternalModifiedBitOperation(ref key, out _); + while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession)); + } + /// <inheritdoc/> public unsafe void ResetModified(Key key) => ResetModified(ref key); /// <inheritdoc/> - internal unsafe bool IsModified(ref Key key) + internal bool IsModified(ref Key key) { - RecordInfo modifiedInfo; UnsafeResumeThread(); try { - OperationStatus status; - do - status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false); - while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession)); + return UnsafeIsModified(ref key); } finally { UnsafeSuspendThread(); } + } + + internal bool UnsafeIsModified(ref Key key) + { + RecordInfo modifiedInfo; + OperationStatus status; + do + status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false); + while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession)); return modifiedInfo.Modified; } @@ -848,6 +854,8 @@ public IFasterScanIterator<Key, Value> Iterate(long untilAddress = -1) [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void UnsafeResumeThread() { + // We do not track any "acquired" state here; if someone mixes calls between safe and unsafe contexts, they will + // get the "trying to acquire already-acquired epoch" error. fht.epoch.Resume(); fht.InternalRefresh(ctx, FasterSession); } @@ -856,8 +864,11 @@ internal void UnsafeResumeThread() /// Suspend session on current thread /// </summary> [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void UnsafeSuspendThread() - => fht.epoch.Suspend(); + internal void UnsafeSuspendThread() + { + Debug.Assert(fht.epoch.ThisInstanceProtected()); + fht.epoch.Suspend(); + } void IClientSession.AtomicSwitch(long version) { diff --git a/cs/src/core/ClientSession/ILockableContext.cs b/cs/src/core/ClientSession/ILockableContext.cs index b9cd6c01b..2a7b11b9e 100644 --- a/cs/src/core/ClientSession/ILockableContext.cs +++ b/cs/src/core/ClientSession/ILockableContext.cs @@ -9,6 +9,16 @@ namespace FASTER.core /// <typeparam name="TKey"></typeparam> public interface ILockableContext<TKey> { + /// <summary> + /// Begins a series of lock operations on possibly multiple keys; call before any locks are taken. + /// </summary> + void BeginLockable(); + + /// <summary> + /// Ends a series of lock operations on possibly multiple keys; call after all locks are released. + /// </summary> + void EndLockable(); + /// <summary> /// Lock the key with the specified <paramref name="lockType"/>, waiting until it is acquired /// </summary> diff --git a/cs/src/core/ClientSession/IUnsafeContext.cs b/cs/src/core/ClientSession/IUnsafeContext.cs index 0b7727e3b..7f07326de 100644 --- a/cs/src/core/ClientSession/IUnsafeContext.cs +++ b/cs/src/core/ClientSession/IUnsafeContext.cs @@ -10,13 +10,13 @@ namespace FASTER.core public interface IUnsafeContext { /// <summary> - /// Resume session on current thread. IMPORTANT: Call SuspendThread before any async op. + /// Resume session on current thread. IMPORTANT: Call <see cref="EndUnsafe"/> before any async op. /// </summary> - void ResumeThread(); + void BeginUnsafe(); /// <summary> /// Suspend session on current thread /// </summary> - void SuspendThread(); + void EndUnsafe(); } } diff --git a/cs/src/core/ClientSession/LockableContext.cs b/cs/src/core/ClientSession/LockableContext.cs index a69a54e91..aa57c535c 100644 --- a/cs/src/core/ClientSession/LockableContext.cs +++ b/cs/src/core/ClientSession/LockableContext.cs @@ -17,59 +17,31 @@ namespace FASTER.core readonly ClientSession<Key, Value, Input, Output, Context, Functions> clientSession; readonly InternalFasterSession FasterSession; + /// <summary>Indicates whether this struct has been initialized</summary> + public bool IsNull => this.clientSession is null; + internal LockableContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession) { this.clientSession = clientSession; FasterSession = new InternalFasterSession(clientSession); } - /// <inheritdoc/> - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void UnsafeResumeThread() - { - clientSession.CheckAcquired(); - Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected()); - clientSession.UnsafeResumeThread(); - } + #region Begin/EndLockable /// <inheritdoc/> - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void UnsafeSuspendThread() - { - clientSession.CheckAcquired(); - Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - clientSession.UnsafeSuspendThread(); - } - - #region Acquire and Dispose + public void BeginLockable() => clientSession.AcquireLockable(); - /// <summary> - /// Acquire a lockable context - /// </summary> - public void Acquire() - { - clientSession.Acquire(); - } + /// <inheritdoc/> + public void EndLockable() => clientSession.ReleaseLockable("LockableContext.EndLockable"); - /// <summary> - /// Release a lockable context; does not actually dispose of anything - /// </summary> - public void Release() - { - if (clientSession.fht.epoch.ThisInstanceProtected()) - throw new FasterException("Releasing LockableContext with a protected epoch; must call UnsafeSuspendThread"); - if (clientSession.TotalLockCount > 0) - throw new FasterException($"Releasing LockableContext with locks held: {clientSession.sharedLockCount} shared locks, {clientSession.exclusiveLockCount} exclusive locks"); - clientSession.Release(); - } - #endregion Acquire and Dispose + #endregion Begin/EndLockable #region Key Locking /// <inheritdoc/> public unsafe void Lock(ref Key key, LockType lockType) { - clientSession.CheckAcquired(); + clientSession.CheckIsAcquiredLockable(); Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected()); clientSession.UnsafeResumeThread(); try @@ -100,7 +72,7 @@ public unsafe void Lock(ref Key key, LockType lockType) /// <inheritdoc/> public void Unlock(ref Key key, LockType lockType) { - clientSession.CheckAcquired(); + clientSession.CheckIsAcquiredLockable(); Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected()); clientSession.UnsafeResumeThread(); try @@ -131,7 +103,7 @@ public void Unlock(ref Key key, LockType lockType) /// <inheritdoc/> public (bool exclusive, byte shared) IsLocked(ref Key key) { - clientSession.CheckAcquired(); + clientSession.CheckIsAcquiredLockable(); Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected()); clientSession.UnsafeResumeThread(); try @@ -643,8 +615,8 @@ public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, re [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out bool lockFailed, out OperationStatus status) { - recordInfo.SetDirtyAndModified(); lockFailed = false; + recordInfo.SetDirtyAndModified(); return _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status); } diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs index bd80bdda1..e676129ea 100644 --- a/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -using System; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; @@ -12,60 +11,48 @@ namespace FASTER.core /// <summary> /// Faster Context implementation that allows manual control of record locking and epoch management. For advanced use only. /// </summary> - public sealed class LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> : IFasterContext<Key, Value, Input, Output, Context>, ILockableContext<Key>, IUnsafeContext, IDisposable + public readonly struct LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> : IFasterContext<Key, Value, Input, Output, Context>, ILockableContext<Key>, IUnsafeContext where Functions : IFunctions<Key, Value, Input, Output, Context> { readonly ClientSession<Key, Value, Input, Output, Context, Functions> clientSession; internal readonly InternalFasterSession FasterSession; + /// <summary>Indicates whether this struct has been initialized</summary> + public bool IsNull => this.clientSession is null; + internal LockableUnsafeContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession) { this.clientSession = clientSession; FasterSession = new InternalFasterSession(clientSession); } + #region Begin/EndUnsafe + /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void ResumeThread() - { - clientSession.CheckAcquired(); - clientSession.UnsafeResumeThread(); - } + public void BeginUnsafe() => clientSession.UnsafeResumeThread(); /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void SuspendThread() - { - clientSession.CheckAcquired(); - Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - clientSession.UnsafeSuspendThread(); - } + public void EndUnsafe() => clientSession.UnsafeSuspendThread(); - #region Acquire and Dispose - internal void Acquire() - { - clientSession.Acquire(); - } + #endregion Begin/EndUnsafe - /// <summary> - /// Does not actually dispose of anything; asserts the epoch has been suspended - /// </summary> - public void Dispose() - { - if (clientSession.fht.epoch.ThisInstanceProtected()) - throw new FasterException("Disposing LockableUnsafeContext with a protected epoch; must call UnsafeSuspendThread"); - if (clientSession.TotalLockCount > 0) - throw new FasterException($"Disposing LockableUnsafeContext with locks held: {clientSession.sharedLockCount} shared locks, {clientSession.exclusiveLockCount} exclusive locks"); - clientSession.Release(); - } - #endregion Acquire and Dispose + #region Begin/EndLockable + + /// <inheritdoc/> + public void BeginLockable() => clientSession.AcquireLockable(); + + /// <inheritdoc/> + public void EndLockable() => clientSession.ReleaseLockable("LockableUnsafeContext.EndLockable"); + #endregion Begin/EndLockable #region Key Locking /// <inheritdoc/> public unsafe void Lock(ref Key key, LockType lockType) { - clientSession.CheckAcquired(); + clientSession.CheckIsAcquiredLockable(); Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for Lock()"); LockOperation lockOp = new(LockOperationType.Lock, lockType); @@ -89,7 +76,7 @@ public unsafe void Lock(ref Key key, LockType lockType) /// <inheritdoc/> public void Unlock(ref Key key, LockType lockType) { - clientSession.CheckAcquired(); + clientSession.CheckIsAcquiredLockable(); Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for Unlock()"); LockOperation lockOp = new(LockOperationType.Unlock, lockType); @@ -113,7 +100,7 @@ public void Unlock(ref Key key, LockType lockType) /// <inheritdoc/> public (bool exclusive, byte shared) IsLocked(ref Key key) { - clientSession.CheckAcquired(); + clientSession.CheckIsAcquiredLockable(); Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for IsLocked()"); LockOperation lockOp = new(LockOperationType.IsLocked, LockType.None); @@ -411,22 +398,18 @@ public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void ResetModified(ref Key key) - => clientSession.fht.InternalModifiedBitOperation(ref key, out _); + public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default) + => DeleteAsync(ref key, userContext, serialNo, token); /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal bool IsModified(Key key) - { - - clientSession.fht.InternalModifiedBitOperation(ref key, out var modifiedInfo, false); - return modifiedInfo.Modified; - } + public void ResetModified(ref Key key) + => clientSession.UnsafeResetModified(ref key); /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default) - => DeleteAsync(ref key, userContext, serialNo, token); + internal bool IsModified(Key key) + => clientSession.UnsafeIsModified(ref key); /// <inheritdoc/> public void Refresh() diff --git a/cs/src/core/ClientSession/UnsafeContext.cs b/cs/src/core/ClientSession/UnsafeContext.cs index b606cf6b3..253736e6b 100644 --- a/cs/src/core/ClientSession/UnsafeContext.cs +++ b/cs/src/core/ClientSession/UnsafeContext.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -using System; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; @@ -12,19 +11,14 @@ namespace FASTER.core /// <summary> /// Faster Operations implementation that allows manual control of record epoch management. For advanced use only. /// </summary> - public sealed class UnsafeContext<Key, Value, Input, Output, Context, Functions> : IFasterContext<Key, Value, Input, Output, Context>, IUnsafeContext, IDisposable + public readonly struct UnsafeContext<Key, Value, Input, Output, Context, Functions> : IFasterContext<Key, Value, Input, Output, Context>, IUnsafeContext where Functions : IFunctions<Key, Value, Input, Output, Context> { readonly ClientSession<Key, Value, Input, Output, Context, Functions> clientSession; - internal readonly InternalFasterSession FasterSession; - bool isAcquired; - void CheckAcquired() - { - if (!isAcquired) - throw new FasterException("Method call on not-acquired UnsafeContext"); - } + /// <summary>Indicates whether this struct has been initialized</summary> + public bool IsNull => this.clientSession is null; internal UnsafeContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession) { @@ -32,40 +26,17 @@ internal UnsafeContext(ClientSession<Key, Value, Input, Output, Context, Functio FasterSession = new InternalFasterSession(clientSession); } + #region Begin/EndUnsafe + /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void ResumeThread() - { - CheckAcquired(); - clientSession.UnsafeResumeThread(); - } + public void BeginUnsafe() => clientSession.UnsafeResumeThread(); /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void SuspendThread() - { - Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - clientSession.UnsafeSuspendThread(); - } - - #region Acquire and Dispose - internal void Acquire() - { - if (this.isAcquired) - throw new FasterException("Trying to acquire an already-acquired UnsafeContext"); - this.isAcquired = true; - } + public void EndUnsafe() => clientSession.UnsafeSuspendThread(); - /// <summary> - /// Does not actually dispose of anything; asserts the epoch has been suspended - /// </summary> - public void Dispose() - { - if (clientSession.fht.epoch.ThisInstanceProtected()) - throw new FasterException("Disposing UnsafeContext with a protected epoch; must call UnsafeSuspendThread"); - this.isAcquired = false; - } - #endregion Acquire and Dispose + #endregion Begin/EndUnsafe #region IFasterContext @@ -347,15 +318,12 @@ public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] public void ResetModified(ref Key key) - => clientSession.fht.InternalModifiedBitOperation(ref key, out _); + => clientSession.UnsafeResetModified(ref key); /// <inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool IsModified(Key key) - { - clientSession.fht.InternalModifiedBitOperation(ref key, out var modifiedInfo, false); - return modifiedInfo.Modified; - } + => clientSession.UnsafeIsModified(ref key); /// <inheritdoc/> public void Refresh() diff --git a/cs/stress/SessionWrapper.cs b/cs/stress/SessionWrapper.cs index ab2402db4..eed169c21 100644 --- a/cs/stress/SessionWrapper.cs +++ b/cs/stress/SessionWrapper.cs @@ -29,12 +29,12 @@ internal void PrepareTest(ClientSession<TKey, TValue, TInput, TOutput, Empty, IF { this.session = session; if (testLoader.WantLUC(rng)) - this.luContext = session.GetLockableUnsafeContext(); + this.luContext = session.LockableUnsafeContext; } internal ClientSession<TKey, TValue, TInput, TOutput, Empty, IFunctions<TKey, TValue, TInput, TOutput, Empty>> FkvSession => this.session; - internal bool IsLUC => this.luContext is not null; + internal bool IsLUC => !this.luContext.IsNull; #region Read @@ -81,7 +81,7 @@ private void ReadLUC(int keyOrdinal, int keyCount, TKey[] keys) { try { - luContext.ResumeThread(); // Retain epoch control through lock, the operation, and unlock + luContext.BeginUnsafe(); // Retain epoch control through lock, the operation, and unlock testLoader.MaybeLock(luContext, keyCount, keys, isRmw: false, isAsyncTest: false); TOutput output = default; var status = luContext.Read(ref keys[0], ref output); @@ -101,7 +101,7 @@ private void ReadLUC(int keyOrdinal, int keyCount, TKey[] keys) finally { testLoader.MaybeUnlock(luContext, keyCount, keys, isRmw: false, isAsyncTest: false); - luContext.SuspendThread(); + luContext.EndUnsafe(); } } @@ -169,7 +169,7 @@ private void RMWLUC(int keyOrdinal, int keyCount, TKey[] keys, TInput input) { try { - luContext.ResumeThread(); // Retain epoch control through lock, the operation, and unlock + luContext.BeginUnsafe(); // Retain epoch control through lock, the operation, and unlock testLoader.MaybeLock(luContext, keyCount, keys, isRmw: true, isAsyncTest: false); TOutput output = default; var status = luContext.RMW(ref keys[0], ref input, ref output); @@ -188,7 +188,7 @@ private void RMWLUC(int keyOrdinal, int keyCount, TKey[] keys, TInput input) finally { testLoader.MaybeUnlock(luContext, keyCount, keys, isRmw: true, isAsyncTest: false); - luContext.SuspendThread(); + luContext.EndUnsafe(); } } @@ -237,13 +237,13 @@ private void UpsertLUC(ref TKey key, ref TValue value) { try { - luContext.ResumeThread(); + luContext.BeginUnsafe(); var status = luContext.Upsert(ref key, ref value); Assert.IsFalse(status.IsPending, status.ToString()); } finally { - luContext.SuspendThread(); + luContext.EndUnsafe(); } } @@ -274,13 +274,13 @@ private void DeleteLUC(TKey key) { try { - luContext.ResumeThread(); + luContext.BeginUnsafe(); var status = luContext.Delete(ref key); Assert.IsFalse(status.IsPending, status.ToString()); } finally { - luContext.SuspendThread(); + luContext.EndUnsafe(); } } @@ -288,11 +288,7 @@ private void DeleteLUC(TKey key) public void Dispose() { - if (luContext is not null) - { - luContext.Dispose(); - luContext = default; - } + luContext = default; session.Dispose(); session = default; } diff --git a/cs/stress/TestLoader.cs b/cs/stress/TestLoader.cs index b9739da2a..be04f0baa 100644 --- a/cs/stress/TestLoader.cs +++ b/cs/stress/TestLoader.cs @@ -226,7 +226,7 @@ internal void MaybeLock<TKey>(ILockableContext<TKey> luContext, int keyCount, TK var uContext = luContext as IUnsafeContext; if (isAsyncTest) - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { for (var ii = 0; ii < keyCount; ++ii) @@ -240,7 +240,7 @@ internal void MaybeLock<TKey>(ILockableContext<TKey> luContext, int keyCount, TK finally { if (isAsyncTest) - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -251,7 +251,7 @@ internal void MaybeUnlock<TKey>(ILockableContext<TKey> luContext, int keyCount, var uContext = luContext as IUnsafeContext; if (isAsyncTest) - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { for (var ii = 0; ii < keyCount; ++ii) @@ -265,7 +265,7 @@ internal void MaybeUnlock<TKey>(ILockableContext<TKey> luContext, int keyCount, finally { if (isAsyncTest) - uContext.SuspendThread(); + uContext.EndUnsafe(); } } diff --git a/cs/test/AdvancedLockTests.cs b/cs/test/AdvancedLockTests.cs index 3ef121d71..ca7911341 100644 --- a/cs/test/AdvancedLockTests.cs +++ b/cs/test/AdvancedLockTests.cs @@ -230,43 +230,43 @@ public async ValueTask NoLocksAfterRestoreTest([Values] CheckpointType checkpoin { // Populate and Lock using var session = fht1.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; var firstKeyEnd = incremental ? numKeys / 2 : numKeys; - luContext.ResumeThread(); + luContext.BeginUnsafe(); for (int key = 0; key < firstKeyEnd; key++) { luContext.Upsert(key, getValue(key)); if ((key % lockKeyInterval) == 0) luContext.Lock(key, getLockType(key)); } - luContext.SuspendThread(); + luContext.EndUnsafe(); fht1.TryInitiateFullCheckpoint(out token, checkpointType); await fht1.CompleteCheckpointAsync(); if (incremental) { - luContext.ResumeThread(); + luContext.BeginUnsafe(); for (int key = firstKeyEnd; key < numKeys; key++) { luContext.Upsert(key, getValue(key)); if ((key % lockKeyInterval) == 0) luContext.Lock(key, getLockType(key)); } - luContext.SuspendThread(); + luContext.EndUnsafe(); var _result1 = fht1.TryInitiateHybridLogCheckpoint(out var _token1, checkpointType, tryIncremental: true); await fht1.CompleteCheckpointAsync(); } - luContext.ResumeThread(); + luContext.BeginUnsafe(); for (int key = 0; key < numKeys; key += lockKeyInterval) { // This also verifies the locks are there--otherwise (in Debug) we'll AssertFail trying to unlock an unlocked record luContext.Unlock(key, getLockType(key)); } - luContext.SuspendThread(); + luContext.EndUnsafe(); } if (syncMode == SyncMode.Async) @@ -276,15 +276,15 @@ public async ValueTask NoLocksAfterRestoreTest([Values] CheckpointType checkpoin { // Ensure there are no locks using var session = fht2.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); for (int key = 0; key < numKeys; key++) { (bool isExclusive, byte isShared) = luContext.IsLocked(key); Assert.IsFalse(isExclusive); Assert.AreEqual(0, isShared); } - luContext.SuspendThread(); + luContext.EndUnsafe(); } } } diff --git a/cs/test/DisposeTests.cs b/cs/test/DisposeTests.cs index ce0aa03db..84cb53613 100644 --- a/cs/test/DisposeTests.cs +++ b/cs/test/DisposeTests.cs @@ -468,7 +468,7 @@ public void DisposeSingleDeleterTest([Values(FlushMode.ReadOnly, FlushMode.OnDis DoFlush(flushMode); // This is necessary for FlushMode.ReadOnly to test the readonly range in Delete() (otherwise we can't test SingleDeleter there) - using var luc = fht.NewSession(new DisposeFunctionsNoSync()).GetLockableUnsafeContext(); + var luc = fht.NewSession(new DisposeFunctionsNoSync()).LockableUnsafeContext; void DoDelete(DisposeFunctions functions) { diff --git a/cs/test/LockableUnsafeContextTests.cs b/cs/test/LockableUnsafeContextTests.cs index dba5484fa..718c9fb29 100644 --- a/cs/test/LockableUnsafeContextTests.cs +++ b/cs/test/LockableUnsafeContextTests.cs @@ -194,8 +194,8 @@ public async Task TestShiftHeadAddress([Values] SyncMode syncMode) var sw = Stopwatch.StartNew(); // Copied from UnsafeContextTests to test Async. - using var luContext = session.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); try { @@ -209,9 +209,9 @@ public async Task TestShiftHeadAddress([Values] SyncMode syncMode) } else { - luContext.SuspendThread(); + luContext.EndUnsafe(); var status = (await luContext.UpsertAsync(ref key1, ref value)).Complete(); - luContext.ResumeThread(); + luContext.BeginUnsafe(); Assert.IsFalse(status.IsPending); } } @@ -232,9 +232,9 @@ public async Task TestShiftHeadAddress([Values] SyncMode syncMode) } else { - luContext.SuspendThread(); + luContext.EndUnsafe(); (status, output) = (await luContext.ReadAsync(ref key1, ref input)).Complete(); - luContext.ResumeThread(); + luContext.BeginUnsafe(); } if (!status.IsPending) { @@ -247,9 +247,9 @@ public async Task TestShiftHeadAddress([Values] SyncMode syncMode) } else { - luContext.SuspendThread(); + luContext.EndUnsafe(); await luContext.CompletePendingAsync(); - luContext.ResumeThread(); + luContext.BeginUnsafe(); } // Shift head and retry - should not find in main memory now @@ -273,9 +273,9 @@ public async Task TestShiftHeadAddress([Values] SyncMode syncMode) } else { - luContext.SuspendThread(); + luContext.EndUnsafe(); outputs = await luContext.CompletePendingWithOutputsAsync(); - luContext.ResumeThread(); + luContext.BeginUnsafe(); } int count = 0; @@ -289,7 +289,7 @@ public async Task TestShiftHeadAddress([Values] SyncMode syncMode) } finally { - luContext.SuspendThread(); + luContext.EndUnsafe(); } } @@ -312,110 +312,109 @@ public void InMemorySimpleLockTxnTest([Values] ResultLockTarget resultLockTarget Status status; Dictionary<int, LockType> locks = new(); - using (var luContext = session.GetLockableUnsafeContext()) + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); + try { - luContext.ResumeThread(); + { // key scope + // Get initial source values + int key = 24; + luContext.Lock(key, LockType.Shared); + AssertIsLocked(luContext, key, xlock: false, slock: true); + locks[key] = LockType.Shared; - try - { - { // key scope - // Get initial source values - int key = 24; - luContext.Lock(key, LockType.Shared); - AssertIsLocked(luContext, key, xlock: false, slock: true); - locks[key] = LockType.Shared; - - key = 51; - luContext.Lock(key, LockType.Shared); - locks[key] = LockType.Shared; - AssertIsLocked(luContext, key, xlock: false, slock: true); - - // Lock destination value. - luContext.Lock(resultKey, LockType.Exclusive); - locks[resultKey] = LockType.Exclusive; - AssertIsLocked(luContext, resultKey, xlock: true, slock: false); - - // Re-get source values, to verify (e.g. they may be in readcache now). - // We just locked this above, but for FlushMode.OnDisk it will be in the LockTable and will still be PENDING. - status = luContext.Read(24, out var value24); - if (flushMode == FlushMode.OnDisk) - { - if (status.IsPending) - { - luContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); - Assert.True(completedOutputs.Next()); - value24 = completedOutputs.Current.Output; - Assert.False(completedOutputs.Current.RecordMetadata.RecordInfo.IsLockedExclusive); - Assert.Less(0, completedOutputs.Current.RecordMetadata.RecordInfo.NumLockedShared); - Assert.False(completedOutputs.Next()); - completedOutputs.Dispose(); - } - } - else - { - Assert.IsFalse(status.IsPending, status.ToString()); - } + key = 51; + luContext.Lock(key, LockType.Shared); + locks[key] = LockType.Shared; + AssertIsLocked(luContext, key, xlock: false, slock: true); - status = luContext.Read(51, out var value51); - if (flushMode == FlushMode.OnDisk) - { - if (status.IsPending) - { - luContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); - Assert.True(completedOutputs.Next()); - value51 = completedOutputs.Current.Output; - Assert.False(completedOutputs.Current.RecordMetadata.RecordInfo.IsLockedExclusive); - Assert.Less(0, completedOutputs.Current.RecordMetadata.RecordInfo.NumLockedShared); - Assert.False(completedOutputs.Next()); - completedOutputs.Dispose(); - } - } - else + // Lock destination value. + luContext.Lock(resultKey, LockType.Exclusive); + locks[resultKey] = LockType.Exclusive; + AssertIsLocked(luContext, resultKey, xlock: true, slock: false); + + // Re-get source values, to verify (e.g. they may be in readcache now). + // We just locked this above, but for FlushMode.OnDisk it will be in the LockTable and will still be PENDING. + status = luContext.Read(24, out var value24); + if (flushMode == FlushMode.OnDisk) + { + if (status.IsPending) { - Assert.IsFalse(status.IsPending, status.ToString()); + luContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); + Assert.True(completedOutputs.Next()); + value24 = completedOutputs.Current.Output; + Assert.False(completedOutputs.Current.RecordMetadata.RecordInfo.IsLockedExclusive); + Assert.Less(0, completedOutputs.Current.RecordMetadata.RecordInfo.NumLockedShared); + Assert.False(completedOutputs.Next()); + completedOutputs.Dispose(); } + } + else + { + Assert.IsFalse(status.IsPending, status.ToString()); + } - // Set the phase to Phase.INTERMEDIATE to test the non-Phase.REST blocks - session.ctx.phase = phase; - int dummyInOut = 0; - status = useRMW - ? luContext.RMW(ref resultKey, ref expectedResult, ref dummyInOut, out RecordMetadata recordMetadata) - : luContext.Upsert(ref resultKey, ref dummyInOut, ref expectedResult, ref dummyInOut, out recordMetadata); - if (flushMode == FlushMode.OnDisk) + status = luContext.Read(51, out var value51); + if (flushMode == FlushMode.OnDisk) + { + if (status.IsPending) { - if (status.IsPending) - { - luContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); - Assert.True(completedOutputs.Next()); - resultValue = completedOutputs.Current.Output; - Assert.True(completedOutputs.Current.RecordMetadata.RecordInfo.IsLockedExclusive); - Assert.AreEqual(0, completedOutputs.Current.RecordMetadata.RecordInfo.NumLockedShared); - Assert.False(completedOutputs.Next()); - completedOutputs.Dispose(); - } + luContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); + Assert.True(completedOutputs.Next()); + value51 = completedOutputs.Current.Output; + Assert.False(completedOutputs.Current.RecordMetadata.RecordInfo.IsLockedExclusive); + Assert.Less(0, completedOutputs.Current.RecordMetadata.RecordInfo.NumLockedShared); + Assert.False(completedOutputs.Next()); + completedOutputs.Dispose(); } - else + } + else + { + Assert.IsFalse(status.IsPending, status.ToString()); + } + + // Set the phase to Phase.INTERMEDIATE to test the non-Phase.REST blocks + session.ctx.phase = phase; + int dummyInOut = 0; + status = useRMW + ? luContext.RMW(ref resultKey, ref expectedResult, ref dummyInOut, out RecordMetadata recordMetadata) + : luContext.Upsert(ref resultKey, ref dummyInOut, ref expectedResult, ref dummyInOut, out recordMetadata); + if (flushMode == FlushMode.OnDisk) + { + if (status.IsPending) { - Assert.IsFalse(status.IsPending, status.ToString()); + luContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); + Assert.True(completedOutputs.Next()); + resultValue = completedOutputs.Current.Output; + Assert.True(completedOutputs.Current.RecordMetadata.RecordInfo.IsLockedExclusive); + Assert.AreEqual(0, completedOutputs.Current.RecordMetadata.RecordInfo.NumLockedShared); + Assert.False(completedOutputs.Next()); + completedOutputs.Dispose(); } - - // Reread the destination to verify - status = luContext.Read(resultKey, out resultValue); + } + else + { Assert.IsFalse(status.IsPending, status.ToString()); - Assert.AreEqual(expectedResult, resultValue); } - foreach (var key in locks.Keys.OrderBy(key => -key)) - luContext.Unlock(key, locks[key]); - } - catch (Exception) - { - ClearCountsOnError(session); - throw; - } - finally - { - luContext.SuspendThread(); + + // Reread the destination to verify + status = luContext.Read(resultKey, out resultValue); + Assert.IsFalse(status.IsPending, status.ToString()); + Assert.AreEqual(expectedResult, resultValue); } + foreach (var key in locks.Keys.OrderBy(key => -key)) + luContext.Unlock(key, locks[key]); + } + catch (Exception) + { + ClearCountsOnError(session); + throw; + } + finally + { + luContext.EndLockable(); + luContext.EndUnsafe(); } // Verify reading the destination from the full session. @@ -441,8 +440,9 @@ public void InMemoryLongLockTest([Values] ResultLockTarget resultLockTarget, [Va var useRMW = updateOp == UpdateOp.RMW; Status status; - using var luContext = session.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { @@ -501,7 +501,8 @@ public void InMemoryLongLockTest([Values] ResultLockTarget resultLockTarget, [Va } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } // Verify from the full session. @@ -529,38 +530,38 @@ public void InMemoryDeleteTest([Values] ResultLockTarget resultLockTarget, [Valu int resultKey = resultLockTarget == ResultLockTarget.LockTable ? numRecords + 1 : 75; Status status; - using (var luContext = session.GetLockableUnsafeContext()) + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); + + try { - luContext.ResumeThread(); + // Lock destination value. + luContext.Lock(resultKey, LockType.Exclusive); + locks[resultKey] = LockType.Exclusive; + AssertIsLocked(luContext, resultKey, xlock: true, slock: false); - try - { - // Lock destination value. - luContext.Lock(resultKey, LockType.Exclusive); - locks[resultKey] = LockType.Exclusive; - AssertIsLocked(luContext, resultKey, xlock: true, slock: false); + // Set the phase to Phase.INTERMEDIATE to test the non-Phase.REST blocks + session.ctx.phase = phase; + status = luContext.Delete(ref resultKey); + Assert.IsFalse(status.IsPending, status.ToString()); - // Set the phase to Phase.INTERMEDIATE to test the non-Phase.REST blocks - session.ctx.phase = phase; - status = luContext.Delete(ref resultKey); - Assert.IsFalse(status.IsPending, status.ToString()); + // Reread the destination to verify + status = luContext.Read(resultKey, out var _); + Assert.IsFalse(status.Found, status.ToString()); - // Reread the destination to verify - status = luContext.Read(resultKey, out var _); - Assert.IsFalse(status.Found, status.ToString()); - - foreach (var key in locks.Keys.OrderBy(key => key)) - luContext.Unlock(key, locks[key]); - } - catch (Exception) - { - ClearCountsOnError(session); - throw; - } - finally - { - luContext.SuspendThread(); - } + foreach (var key in locks.Keys.OrderBy(key => key)) + luContext.Unlock(key, locks[key]); + } + catch (Exception) + { + ClearCountsOnError(session); + throw; + } + finally + { + luContext.EndLockable(); + luContext.EndUnsafe(); } // Verify reading the destination from the full session. @@ -588,8 +589,9 @@ void runLockThread(int tid) Random rng = new(tid + 101); using var localSession = fht.For(new LockableUnsafeFunctions()).NewSession<LockableUnsafeFunctions>(); - using var luContext = localSession.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = localSession.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); for (var iteration = 0; iteration < numIterations; ++iteration) { @@ -605,7 +607,8 @@ void runLockThread(int tid) locks.Clear(); } - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } void runOpThread(int tid) @@ -688,11 +691,12 @@ public void TransferFromLockTableToCTTTest() fht.Log.FlushAndEvict(wait: true); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; int input = 0, output = 0, key = transferToExistingKey; ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail}; - luContext.ResumeThread(); + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { AddLockTableEntry(luContext, key, immutable: false); @@ -710,7 +714,8 @@ public void TransferFromLockTableToCTTTest() } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -722,10 +727,11 @@ public void TransferFromEvictionToLockTable() Populate(); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; int key = transferToExistingKey; - luContext.ResumeThread(); + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { luContext.Lock(ref key, LockType.Exclusive); @@ -746,7 +752,8 @@ public void TransferFromEvictionToLockTable() } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -768,8 +775,9 @@ public void TransferFromLockTableToUpsertTest([Values] ChainTests.RecordRegion r PopulateAndEvict(recordRegion == ChainTests.RecordRegion.Immutable); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); int key = -1; try @@ -798,7 +806,8 @@ public void TransferFromLockTableToUpsertTest([Values] ChainTests.RecordRegion r } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -810,8 +819,9 @@ public void TransferFromLockTableToRMWTest([Values] ChainTests.RecordRegion reco PopulateAndEvict(recordRegion == ChainTests.RecordRegion.Immutable); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); int key = -1; try @@ -840,7 +850,8 @@ public void TransferFromLockTableToRMWTest([Values] ChainTests.RecordRegion reco } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -852,8 +863,9 @@ public void TransferFromLockTableToDeleteTest([Values] ChainTests.RecordRegion r PopulateAndEvict(recordRegion == ChainTests.RecordRegion.Immutable); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); int key = -1; try @@ -887,7 +899,8 @@ public void TransferFromLockTableToDeleteTest([Values] ChainTests.RecordRegion r } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -898,14 +911,15 @@ public void LockAndUnlockInLockTableOnlyTest() { // For this, just don't load anything, and it will happen in lock table. using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; Dictionary<int, LockType> locks = new(); var rng = new Random(101); foreach (var key in Enumerable.Range(0, numRecords).Select(ii => rng.Next(numRecords))) locks[key] = (key & 1) == 0 ? LockType.Exclusive : LockType.Shared; - luContext.ResumeThread(); + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { @@ -935,7 +949,8 @@ public void LockAndUnlockInLockTableOnlyTest() } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } Assert.IsFalse(fht.LockTable.IsActive); @@ -950,13 +965,12 @@ public void TransferFromReadOnlyToUpdateRecordTest([Values] UpdateOp updateOp) Populate(); this.fht.Log.ShiftReadOnlyAddress(this.fht.Log.TailAddress, wait: true); - using var luContext = session.GetLockableUnsafeContext(); - const int key = 42; - static int getValue(int key) => key + valueMult; - luContext.ResumeThread(); + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { @@ -988,7 +1002,8 @@ public void TransferFromReadOnlyToUpdateRecordTest([Values] UpdateOp updateOp) } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -1001,8 +1016,8 @@ public void LockNewRecordCompeteWithUpdateTest([Values(LockOperationType.Lock, L using var updateSession = fht.NewSession(new SimpleFunctions<int, int>()); using var lockSession = fht.NewSession(new SimpleFunctions<int, int>()); - using var updateLuContext = updateSession.GetLockableUnsafeContext(); - using var lockLuContext = lockSession.GetLockableUnsafeContext(); + var updateLuContext = updateSession.LockableUnsafeContext; + var lockLuContext = lockSession.LockableUnsafeContext; LockType getLockType(int key) => ((key & 1) == 0) ? LockType.Exclusive : LockType.Shared; int getValue(int key) => key + valueMult; @@ -1018,6 +1033,9 @@ public void LockNewRecordCompeteWithUpdateTest([Values(LockOperationType.Lock, L // Now populate the main area of the log. Populate(); + lockLuContext.BeginUnsafe(); + lockLuContext.BeginLockable(); + HashSet<int> locks = new(); void lockKey(int key) { @@ -1030,7 +1048,6 @@ void unlockKey(int key) locks.Remove(key); } - lockLuContext.ResumeThread(); try { @@ -1077,14 +1094,18 @@ void unlockKey(int key) } finally { - lockLuContext.SuspendThread(); + lockLuContext.EndLockable(); + lockLuContext.EndUnsafe(); } void locker(int key) { try { - lockLuContext.ResumeThread(); + // Begin/EndLockable are called outside this function; we could not EndLockable in here as the lock lifetime is beyond that. + // (BeginLockable's scope is the session; BeginUnsafe's scope is the thread. The session is still "mono-threaded" here because + // only one thread at a time is making calls on it.) + lockLuContext.BeginUnsafe(); if (lockOp == LockOperationType.Lock) lockKey(key); else @@ -1097,13 +1118,13 @@ void locker(int key) } finally { - lockLuContext.SuspendThread(); + lockLuContext.EndUnsafe(); } } void updater(int key) { - updateLuContext.ResumeThread(); + updateLuContext.BeginUnsafe(); try { @@ -1127,7 +1148,7 @@ void updater(int key) } finally { - updateLuContext.SuspendThread(); + updateLuContext.EndUnsafe(); } } } @@ -1140,12 +1161,13 @@ public void MultiSharedLockTest() Populate(); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; const int key = 42; var maxLocks = 63; - luContext.ResumeThread(); + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { @@ -1172,7 +1194,8 @@ public void MultiSharedLockTest() } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -1184,14 +1207,15 @@ public void EvictFromMainLogToLockTableTest() Populate(); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; Dictionary<int, LockType> locks = new(); var rng = new Random(101); foreach (var key in Enumerable.Range(0, numRecords / 5).Select(ii => rng.Next(numRecords))) locks[key] = (key & 1) == 0 ? LockType.Exclusive : LockType.Shared; - luContext.ResumeThread(); + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { @@ -1243,7 +1267,8 @@ public void EvictFromMainLogToLockTableTest() } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } Assert.IsFalse(fht.LockTable.IsActive); @@ -1267,11 +1292,13 @@ public async ValueTask CheckpointRecoverTest([Values] CheckpointType checkpointT bool success = true; { using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; try { - luContext.ResumeThread(); + // We must retain this BeginLockable across the checkpoint, because we can't call EndLockable with locks held. + luContext.BeginUnsafe(); + luContext.BeginLockable(); // For this single-threaded test, the locking does not really have to be in order, but for consistency do it. foreach (var key in locks.Keys.OrderBy(k => k)) @@ -1280,11 +1307,12 @@ public async ValueTask CheckpointRecoverTest([Values] CheckpointType checkpointT catch (Exception) { ClearCountsOnError(session); + luContext.EndLockable(); throw; } finally { - luContext.SuspendThread(); + luContext.EndUnsafe(); } this.fht.Log.ShiftReadOnlyAddress(this.fht.Log.TailAddress, wait: true); @@ -1300,7 +1328,7 @@ public async ValueTask CheckpointRecoverTest([Values] CheckpointType checkpointT try { - luContext.ResumeThread(); + luContext.BeginUnsafe(); foreach (var key in locks.Keys.OrderBy(k => -k)) luContext.Unlock(key, locks[key]); } @@ -1311,7 +1339,8 @@ public async ValueTask CheckpointRecoverTest([Values] CheckpointType checkpointT } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } } @@ -1324,8 +1353,8 @@ public async ValueTask CheckpointRecoverTest([Values] CheckpointType checkpointT await this.fht.RecoverAsync(fullCheckpointToken); { - using var luContext = this.session.GetLockableUnsafeContext(); - luContext.ResumeThread(); + var luContext = this.session.LockableUnsafeContext; + luContext.BeginUnsafe(); try { @@ -1343,7 +1372,7 @@ public async ValueTask CheckpointRecoverTest([Values] CheckpointType checkpointT } finally { - luContext.SuspendThread(); + luContext.EndUnsafe(); } } } @@ -1387,7 +1416,7 @@ await Task.WhenAll(Task.Run(() => PrimaryWriter(primaryStore, syncMode)), async static Task PrimaryWriter(FasterKV<long, long> primaryStore, SyncMode syncMode) { using var s1 = primaryStore.NewSession(new SimpleFunctions<long, long>()); - using var luc1 = s1.GetLockableUnsafeContext(); + var luc1 = s1.LockableUnsafeContext; // Upserting keys at primary starting from key 0 for (long key = 0; key < numSecondaryReaderKeys; key++) @@ -1413,7 +1442,8 @@ async static Task PrimaryWriter(FasterKV<long, long> primaryStore, SyncMode sync try { - luc1.ResumeThread(); + luc1.BeginUnsafe(); + luc1.BeginLockable(); luc1.Lock(key, LockType.Shared); } catch (Exception) @@ -1423,7 +1453,8 @@ async static Task PrimaryWriter(FasterKV<long, long> primaryStore, SyncMode sync } finally { - luc1.SuspendThread(); + luc1.EndLockable(); + luc1.EndUnsafe(); } } @@ -1432,7 +1463,8 @@ async static Task PrimaryWriter(FasterKV<long, long> primaryStore, SyncMode sync try { - luc1.ResumeThread(); + luc1.BeginUnsafe(); + luc1.BeginLockable(); // Unlock everything before we Dispose() luc1 for (long kk = 0; kk < numSecondaryReaderKeys; kk++) @@ -1447,14 +1479,15 @@ async static Task PrimaryWriter(FasterKV<long, long> primaryStore, SyncMode sync } finally { - luc1.SuspendThread(); + luc1.EndLockable(); + luc1.EndUnsafe(); } } async static Task SecondaryReader(FasterKV<long, long> secondaryStore, SyncMode syncMode) { using var s1 = secondaryStore.NewSession(new SimpleFunctions<long, long>()); - using var luc1 = s1.GetLockableUnsafeContext(); + var luc1 = s1.LockableUnsafeContext; long key = 0, output = 0; while (true) @@ -1474,7 +1507,8 @@ async static Task SecondaryReader(FasterKV<long, long> secondaryStore, SyncMode continue; } - luc1.ResumeThread(); + luc1.BeginUnsafe(); + luc1.BeginLockable(); try { while (true) @@ -1503,7 +1537,8 @@ async static Task SecondaryReader(FasterKV<long, long> secondaryStore, SyncMode } finally { - luc1.SuspendThread(); + luc1.EndLockable(); + luc1.EndUnsafe(); } } } diff --git a/cs/test/ModifiedBitTests.cs b/cs/test/ModifiedBitTests.cs index a0e4db5fb..c5863bd48 100644 --- a/cs/test/ModifiedBitTests.cs +++ b/cs/test/ModifiedBitTests.cs @@ -2,17 +2,11 @@ // Licensed under the MIT license. using System; -using System.Collections.Generic; using System.IO; -using System.Linq; using System.Threading; using FASTER.core; using NUnit.Framework; -using FASTER.test.ReadCacheTests; -using System.Threading.Tasks; using static FASTER.test.TestUtils; -using System.Diagnostics; -using FASTER.test.LockableUnsafeContext; namespace FASTER.test.ModifiedTests { @@ -96,16 +90,16 @@ static void AssertLockandModified(LockableContext<int, int, int, int, Empty, Sim static void AssertLockandModified(ClientSession<int, int, int, int, Empty, SimpleFunctions<int, int>> session, int key, bool xlock, bool slock, bool modfied = false) { - using (var luContext = session.GetLockableUnsafeContext()) - { - luContext.ResumeThread(); - var (isX, isS) = luContext.IsLocked(key); - var isM = luContext.IsModified(key); - Assert.AreEqual(xlock, isX, "xlock mismatch"); - Assert.AreEqual(slock, isS > 0, "slock mismatch"); - Assert.AreEqual(modfied, isM, "Modified mismatch"); - luContext.SuspendThread(); - } + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); + var (isX, isS) = luContext.IsLocked(key); + var isM = luContext.IsModified(key); + Assert.AreEqual(xlock, isX, "xlock mismatch"); + Assert.AreEqual(slock, isS > 0, "slock mismatch"); + Assert.AreEqual(modfied, isM, "Modified mismatch"); + luContext.EndLockable(); + luContext.EndUnsafe(); } [Test] @@ -118,7 +112,7 @@ public void LockAndNotModify() session.ResetModified(key); var LC = session.LockableContext; - LC.Acquire(); + LC.BeginLockable(); AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); LC.Lock(key, LockType.Exclusive); @@ -132,11 +126,9 @@ public void LockAndNotModify() LC.Unlock(key, LockType.Shared); AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); - LC.Release(); - + LC.EndLockable(); } - [Test] [Category(SmokeTestCategory)] public void ResetModifyForNonExistingKey() @@ -147,8 +139,6 @@ public void ResetModifyForNonExistingKey() AssertLockandModified(session, key, xlock: false, slock: false, modfied: false); } - - [Test] [Category(SmokeTestCategory)] public void ModifyClientSession([Values(true, false)] bool flushToDisk, [Values] UpdateOp updateOp) @@ -211,57 +201,57 @@ public void ModifyLUC([Values(true, false)] bool flushToDisk, [Values] UpdateOp int key = numRecords - 500; int value = 14; session.ResetModified(key); - using (var luContext = session.GetLockableUnsafeContext()) - { - luContext.ResumeThread(); - AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: false); - luContext.SuspendThread(); - } + var luContext = session.LockableUnsafeContext; + luContext.BeginUnsafe(); + luContext.BeginLockable(); + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: false); + luContext.EndLockable(); + luContext.EndUnsafe(); if (flushToDisk) this.fht.Log.FlushAndEvict(wait: true); Status status = default; - using (var luContext = session.GetLockableUnsafeContext()) - { - luContext.ResumeThread(); + luContext.BeginUnsafe(); + + switch (updateOp) + { + case UpdateOp.Upsert: + status = luContext.Upsert(key, value); + break; + case UpdateOp.RMW: + status = luContext.RMW(key, value); + break; + case UpdateOp.Delete: + status = luContext.Delete(key); + break; + default: + break; + } + if (flushToDisk) + { switch (updateOp) { - case UpdateOp.Upsert: - status = luContext.Upsert(key, value); - break; case UpdateOp.RMW: - status = luContext.RMW(key, value); - break; - case UpdateOp.Delete: - status = luContext.Delete(key); + Assert.IsTrue(status.IsPending, status.ToString()); + luContext.CompletePending(wait: true); break; default: + Assert.IsTrue(status.NotFound); break; } - if (flushToDisk) - { - switch (updateOp) - { - case UpdateOp.RMW: - Assert.IsTrue(status.IsPending, status.ToString()); - luContext.CompletePending(wait: true); - break; - default: - Assert.IsTrue(status.NotFound); - break; - } - (status, var _) = luContext.Read(key); - Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); - } - if (updateOp == UpdateOp.Delete) - AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: false); - else - AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: true); - luContext.SuspendThread(); + (status, var _) = luContext.Read(key); + Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); } + luContext.BeginLockable(); + if (updateOp == UpdateOp.Delete) + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: false); + else + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: true); + luContext.EndLockable(); + luContext.EndUnsafe(); } [Test] @@ -279,40 +269,40 @@ public void ModifyUC([Values(true, false)] bool flushToDisk, [Values] UpdateOp u this.fht.Log.FlushAndEvict(wait: true); Status status = default; - using (var unsafeContext = session.GetLockableUnsafeContext()) + var unsafeContext = session.LockableUnsafeContext; + + unsafeContext.BeginUnsafe(); + switch (updateOp) + { + case UpdateOp.Upsert: + status = unsafeContext.Upsert(key, value); + break; + case UpdateOp.RMW: + status = unsafeContext.RMW(key, value); + break; + case UpdateOp.Delete: + status = unsafeContext.Delete(key); + break; + default: + break; + } + if (flushToDisk) { - unsafeContext.ResumeThread(); switch (updateOp) { - case UpdateOp.Upsert: - status = unsafeContext.Upsert(key, value); - break; case UpdateOp.RMW: - status = unsafeContext.RMW(key, value); - break; - case UpdateOp.Delete: - status = unsafeContext.Delete(key); + Assert.IsTrue(status.IsPending, status.ToString()); + unsafeContext.CompletePending(wait: true); break; default: + Assert.IsTrue(status.NotFound); break; } - if (flushToDisk) - { - switch (updateOp) - { - case UpdateOp.RMW: - Assert.IsTrue(status.IsPending, status.ToString()); - unsafeContext.CompletePending(wait: true); - break; - default: - Assert.IsTrue(status.NotFound); - break; - } - (status, var _) = unsafeContext.Read(key); - Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); - } - unsafeContext.SuspendThread(); + (status, var _) = unsafeContext.Read(key); + Assert.IsTrue(status.Found || updateOp == UpdateOp.Delete); } + unsafeContext.EndUnsafe(); + if (updateOp == UpdateOp.Delete) AssertLockandModified(session, key, xlock: false, slock: false, modfied: false); else @@ -329,7 +319,7 @@ public void ModifyLC([Values(true, false)] bool flushToDisk, [Values] UpdateOp u int value = 14; session.ResetModified(key); var LC = session.LockableContext; - LC.Acquire(); + LC.BeginLockable(); AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); if (flushToDisk) @@ -370,10 +360,9 @@ public void ModifyLC([Values(true, false)] bool flushToDisk, [Values] UpdateOp u AssertLockandModified(LC, key, xlock: false, slock: false, modfied: false); else AssertLockandModified(LC, key, xlock: false, slock: false, modfied: true); - LC.Release(); + LC.EndLockable(); } - [Test] [Category(SmokeTestCategory)] public void CopyToTailTest() @@ -381,32 +370,32 @@ public void CopyToTailTest() Populate(); fht.Log.FlushAndEvict(wait: true); - using (var luContext = session.GetLockableUnsafeContext()) - { - int input = 0, output = 0, key = 200; - ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail }; + var luContext = session.LockableUnsafeContext; - luContext.ResumeThread(); + int input = 0, output = 0, key = 200; + ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail }; - // Check Read Copy to Tail resets the modfied - var status = luContext.Read(ref key, ref input, ref output, ref readOptions, out _); - Assert.IsTrue(status.IsPending, status.ToString()); - luContext.CompletePending(wait: true); - AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: true); + luContext.BeginUnsafe(); - // Check Read Copy to Tail resets the modfied on locked key - key += 10; - luContext.Lock(key, LockType.Exclusive); - status = luContext.Read(ref key, ref input, ref output, ref readOptions, out _); - Assert.IsTrue(status.IsPending, status.ToString()); - luContext.CompletePending(wait: true); - AssertLockandModified(luContext, key, xlock: true, slock: false, modfied: true); - luContext.Unlock(key, LockType.Exclusive); + // Check Read Copy to Tail resets the modfied + var status = luContext.Read(ref key, ref input, ref output, ref readOptions, out _); + Assert.IsTrue(status.IsPending, status.ToString()); + luContext.CompletePending(wait: true); + luContext.BeginLockable(); + AssertLockandModified(luContext, key, xlock: false, slock: false, modfied: true); - luContext.SuspendThread(); - } - } + // Check Read Copy to Tail resets the modfied on locked key + key += 10; + luContext.Lock(key, LockType.Exclusive); + status = luContext.Read(ref key, ref input, ref output, ref readOptions, out _); + Assert.IsTrue(status.IsPending, status.ToString()); + luContext.CompletePending(wait: true); + AssertLockandModified(luContext, key, xlock: true, slock: false, modfied: true); + luContext.Unlock(key, LockType.Exclusive); + luContext.EndLockable(); + luContext.EndUnsafe(); + } } } diff --git a/cs/test/ReadCacheChainTests.cs b/cs/test/ReadCacheChainTests.cs index a33699e50..13191619e 100644 --- a/cs/test/ReadCacheChainTests.cs +++ b/cs/test/ReadCacheChainTests.cs @@ -495,7 +495,7 @@ public void EvictFromReadCacheToLockTableTest() CreateChain(); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; Dictionary<int, LockType> locks = new() { @@ -504,7 +504,8 @@ public void EvictFromReadCacheToLockTableTest() { highChainKey, LockType.Exclusive } }; - luContext.ResumeThread(); + luContext.BeginUnsafe(); + luContext.BeginLockable(); try { @@ -536,7 +537,8 @@ public void EvictFromReadCacheToLockTableTest() } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } Assert.IsFalse(fht.LockTable.IsActive); @@ -556,7 +558,7 @@ public void TransferFromLockTableToReadCacheTest() //CreateChain(); using var session = fht.NewSession(new SimpleFunctions<int, int>()); - using var luContext = session.GetLockableUnsafeContext(); + var luContext = session.LockableUnsafeContext; Dictionary<int, LockType> locks = new() { @@ -565,7 +567,9 @@ public void TransferFromLockTableToReadCacheTest() { highChainKey, LockType.Exclusive } }; - luContext.ResumeThread(); + luContext.BeginUnsafe(); + luContext.BeginLockable(); + try { // For this single-threaded test, the locking does not really have to be in order, but for consistency do it. @@ -612,7 +616,8 @@ public void TransferFromLockTableToReadCacheTest() } finally { - luContext.SuspendThread(); + luContext.EndLockable(); + luContext.EndUnsafe(); } Assert.IsFalse(fht.LockTable.IsActive); diff --git a/cs/test/StateMachineTests.cs b/cs/test/StateMachineTests.cs index a6fadee9e..3e2737b8e 100644 --- a/cs/test/StateMachineTests.cs +++ b/cs/test/StateMachineTests.cs @@ -96,8 +96,7 @@ public void StateMachineTest1() // Dispose session s2; does not move state machine forward s2.Dispose(); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); @@ -144,8 +143,7 @@ public void StateMachineTest2() // We should be in REST, 2 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 2), fht1.SystemState)); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); @@ -167,7 +165,7 @@ public void StateMachineTest3() Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PREPARE, 1), SystemState.Make(s1.ctx.phase, s1.ctx.version))); // Suspend s1 - uc1.SuspendThread(); + uc1.EndUnsafe(); // Since s2 is the only session now, it will fast-foward state machine // to completion @@ -179,15 +177,14 @@ public void StateMachineTest3() // Expect checkpoint completion callback f.checkpointCallbackExpectation = 1; - uc1.ResumeThread(); + uc1.BeginUnsafe(); // Completion callback should have been called once Assert.AreEqual(0, f.checkpointCallbackExpectation); s2.Dispose(); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); @@ -218,7 +215,7 @@ public void StateMachineTest4() Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.IN_PROGRESS, 2), SystemState.Make(s1.ctx.phase, s1.ctx.version))); // Suspend s1 - uc1.SuspendThread(); + uc1.EndUnsafe(); // Since s2 is the only session now, it will fast-foward state machine // to completion @@ -230,15 +227,14 @@ public void StateMachineTest4() // Expect checkpoint completion callback f.checkpointCallbackExpectation = 1; - uc1.ResumeThread(); + uc1.BeginUnsafe(); // Completion callback should have been called once Assert.AreEqual(0, f.checkpointCallbackExpectation); s2.Dispose(); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); @@ -286,7 +282,7 @@ public void StateMachineTest5() uc1.Refresh(); // Suspend s1 - uc1.SuspendThread(); + uc1.EndUnsafe(); // Since s2 is the only session now, it will fast-foward state machine // to completion @@ -298,15 +294,14 @@ public void StateMachineTest5() // Expect no checkpoint completion callback on resume f.checkpointCallbackExpectation = 0; - uc1.ResumeThread(); + uc1.BeginUnsafe(); // Completion callback should have been called once Assert.AreEqual(0, f.checkpointCallbackExpectation); s2.Dispose(); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); @@ -320,7 +315,7 @@ public void StateMachineTest6() Prepare(out var f, out var s1, out var uc1, out var s2); // Suspend s1 - uc1.SuspendThread(); + uc1.EndUnsafe(); // s1 is now in REST, 1 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 1), SystemState.Make(s1.ctx.phase, s1.ctx.version))); @@ -346,20 +341,17 @@ public void StateMachineTest6() // Expect checkpoint completion callback on resume f.checkpointCallbackExpectation = 1; - uc1.ResumeThread(); + uc1.BeginUnsafe(); // Completion callback should have been called once Assert.AreEqual(0, f.checkpointCallbackExpectation); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); } - - [TestCase] [Category("FasterKV"), Category("CheckpointRestore")] public void LUCScenario1() @@ -407,8 +399,8 @@ public void LUCScenario2() // System should be in REST, 1 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 1), fht1.SystemState)); - var uc1 = s1.GetUnsafeContext(); - uc1.ResumeThread(); + var uc1 = s1.UnsafeContext; + uc1.BeginUnsafe(); fht1.TryInitiateHybridLogCheckpoint(out _, CheckpointType.FoldOver); @@ -427,8 +419,7 @@ public void LUCScenario2() lts.getLUC(); Assert.IsFalse(lts.isProtected); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); // fast-foward state machine to completion ts.Refresh(); @@ -457,8 +448,9 @@ public void LUCScenario3() Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 1), fht1.SystemState)); // Start first LUC before checkpoint - var luc1 = s1.GetLockableUnsafeContext(); - luc1.ResumeThread(); + var luc1 = s1.LockableUnsafeContext; + luc1.BeginUnsafe(); + luc1.BeginLockable(); fht1.TryInitiateHybridLogCheckpoint(out _, CheckpointType.FoldOver); @@ -480,8 +472,8 @@ public void LUCScenario3() Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PREPARE, 1), fht1.SystemState)); // End first LUC - luc1.SuspendThread(); - luc1.Dispose(); + luc1.EndLockable(); + luc1.EndUnsafe(); s1.Refresh(); // System should be in IN_PROGRESS, 1 @@ -553,8 +545,7 @@ public void StateMachineCallbackTest1() // Dispose session s2; does not move state machine forward s2.Dispose(); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); @@ -603,8 +594,7 @@ public void VersionChangeTest() // Dispose session s2; does not move state machine forward s2.Dispose(); - uc1.SuspendThread(); - uc1.Dispose(); + uc1.EndUnsafe(); s1.Dispose(); RecoverAndTest(log); @@ -644,8 +634,8 @@ void Prepare(out SimpleFunctions f, fht1.Log.ShiftReadOnlyAddress(fht1.Log.TailAddress, true); // Create unsafe context and hold epoch to prepare for manual state machine driver - uc1 = s1.GetUnsafeContext(); - uc1.ResumeThread(); + uc1 = s1.UnsafeContext; + uc1.BeginUnsafe(); // Start session s2 on another thread for testing s2 = fht1.For(f).CreateThreadSession(f); @@ -697,14 +687,8 @@ bool tryStartLUC( ref LockableUnsafeContext<AdId, NumClicks, NumClicks, NumClicks, Empty, SimpleFunctions> luContext, ClientSession<AdId, NumClicks, NumClicks, NumClicks, Empty, SimpleFunctions> session) { - - luContext = session.GetLockableUnsafeContext(); - if (session.IsInPreparePhase()) - { - luContext.Dispose(); - return false; - } - return true; + luContext = session.LockableUnsafeContext; + return !session.IsInPreparePhase(); } void RecoverAndTest(IDevice log) { diff --git a/cs/test/ThreadSession.cs b/cs/test/ThreadSession.cs index a31846002..afa7baa11 100644 --- a/cs/test/ThreadSession.cs +++ b/cs/test/ThreadSession.cs @@ -58,8 +58,8 @@ public void Dispose() private void SecondSession() { s2 = fht.NewSession(f, null); - uc2 = s2.GetUnsafeContext(); - uc2.ResumeThread(); + uc2 = s2.UnsafeContext; + uc2.BeginUnsafe(); ev.Set(); @@ -73,8 +73,7 @@ private void SecondSession() ev.Set(); break; case "dispose": - uc2.SuspendThread(); - uc2.Dispose(); + uc2.EndUnsafe(); s2.Dispose(); ev.Set(); return; @@ -149,30 +148,28 @@ private void LUCThread() case "dispose": if (isProtected) { - luc.SuspendThread(); - luc.Dispose(); - + luc.EndUnsafe(); } session.Dispose(); ev.Set(); return; case "getLUC": - luc = session.GetLockableUnsafeContext(); + luc = session.LockableUnsafeContext; if (session.IsInPreparePhase()) { - luc.Dispose(); this.isProtected = false; } else { - luc.ResumeThread(); + luc.BeginUnsafe(); + luc.BeginLockable(); this.isProtected = true; } ev.Set(); break; case "DisposeLUC": - luc.SuspendThread(); - luc.Dispose(); + luc.EndLockable(); + luc.EndUnsafe(); this.isProtected = false; ev.Set(); break; diff --git a/cs/test/UnsafeContextTests.cs b/cs/test/UnsafeContextTests.cs index 6aa62e398..6ac8277b1 100644 --- a/cs/test/UnsafeContextTests.cs +++ b/cs/test/UnsafeContextTests.cs @@ -39,14 +39,13 @@ private void Setup(long size, LogSettings logSettings, DeviceType deviceType) logSettings.LogDevice = log; fht = new FasterKV<KeyStruct, ValueStruct>(size, logSettings); fullSession = fht.For(new Functions()).NewSession<Functions>(); - uContext = fullSession.GetUnsafeContext(); + uContext = fullSession.UnsafeContext; } [TearDown] public void TearDown() { - uContext?.Dispose(); - uContext = null; + uContext = default; fullSession?.Dispose(); fullSession = null; fht?.Dispose(); @@ -75,7 +74,7 @@ private void AssertCompleted(Status expected, Status actual) public void NativeInMemWriteRead([Values] DeviceType deviceType) { Setup(128, new LogSettings { PageSizeBits = 10, MemorySizeBits = 12, SegmentSizeBits = 22 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -94,7 +93,7 @@ public void NativeInMemWriteRead([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -104,7 +103,7 @@ public void NativeInMemWriteRead([Values] DeviceType deviceType) public void NativeInMemWriteReadDelete([Values] DeviceType deviceType) { Setup(128, new LogSettings { PageSizeBits = 10, MemorySizeBits = 12, SegmentSizeBits = 22 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -135,7 +134,7 @@ public void NativeInMemWriteReadDelete([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -152,7 +151,7 @@ public void NativeInMemWriteReadDelete2() // Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); Setup(128, new LogSettings { MemorySizeBits = 29 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -193,7 +192,7 @@ public void NativeInMemWriteReadDelete2() } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -209,7 +208,7 @@ public unsafe void NativeInMemWriteRead2() // Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); Setup(128, new LogSettings { MemorySizeBits = 29 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -256,7 +255,7 @@ public unsafe void NativeInMemWriteRead2() } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -274,7 +273,7 @@ public async Task TestShiftHeadAddress([Values] DeviceType deviceType, [Values] var sw = Stopwatch.StartNew(); Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -289,9 +288,9 @@ public async Task TestShiftHeadAddress([Values] DeviceType deviceType, [Values] } else { - uContext.SuspendThread(); + uContext.EndUnsafe(); var status = (await uContext.UpsertAsync(ref key1, ref value)).Complete(); - uContext.ResumeThread(); + uContext.BeginUnsafe(); Assert.IsFalse(status.IsPending); } } @@ -313,9 +312,9 @@ public async Task TestShiftHeadAddress([Values] DeviceType deviceType, [Values] } else { - uContext.SuspendThread(); + uContext.EndUnsafe(); (status, output) = (await uContext.ReadAsync(ref key1, ref input)).Complete(); - uContext.ResumeThread(); + uContext.BeginUnsafe(); } if (!status.IsPending) { @@ -329,9 +328,9 @@ public async Task TestShiftHeadAddress([Values] DeviceType deviceType, [Values] } else { - uContext.SuspendThread(); + uContext.EndUnsafe(); await uContext.CompletePendingAsync(); - uContext.ResumeThread(); + uContext.BeginUnsafe(); } // Shift head and retry - should not find in main memory now @@ -356,9 +355,9 @@ public async Task TestShiftHeadAddress([Values] DeviceType deviceType, [Values] } else { - uContext.SuspendThread(); + uContext.EndUnsafe(); outputs = await uContext.CompletePendingWithOutputsAsync(); - uContext.ResumeThread(); + uContext.BeginUnsafe(); } int count = 0; @@ -373,7 +372,7 @@ public async Task TestShiftHeadAddress([Values] DeviceType deviceType, [Values] } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -386,7 +385,7 @@ public unsafe void NativeInMemRMWRefKeys([Values] DeviceType deviceType) OutputStruct output = default; Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -446,7 +445,7 @@ public unsafe void NativeInMemRMWRefKeys([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -458,7 +457,7 @@ public unsafe void NativeInMemRMWNoRefKeys([Values] DeviceType deviceType) InputStruct input = default; Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -511,7 +510,7 @@ public unsafe void NativeInMemRMWNoRefKeys([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -524,7 +523,7 @@ public void ReadNoRefKeyInputOutput([Values] DeviceType deviceType) InputStruct input = default; Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -543,7 +542,7 @@ public void ReadNoRefKeyInputOutput([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -553,7 +552,7 @@ public void ReadNoRefKeyInputOutput([Values] DeviceType deviceType) public void ReadNoRefKey([Values] DeviceType deviceType) { Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -572,7 +571,7 @@ public void ReadNoRefKey([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -584,7 +583,7 @@ public void ReadNoRefKey([Values] DeviceType deviceType) public void ReadWithoutInput([Values] DeviceType deviceType) { Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -605,7 +604,7 @@ public void ReadWithoutInput([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -619,7 +618,7 @@ public void ReadWithoutSerialID() deviceType = DeviceType.MLSD; Setup(128, new LogSettings { MemorySizeBits = 29 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -640,7 +639,7 @@ public void ReadWithoutSerialID() } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -651,7 +650,7 @@ public void ReadWithoutSerialID() public void ReadBareMinParams([Values] DeviceType deviceType) { Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -670,7 +669,7 @@ public void ReadBareMinParams([Values] DeviceType deviceType) } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } @@ -684,7 +683,7 @@ public void ReadAtAddressReadFlagsNone() deviceType = DeviceType.MLSD; Setup(128, new LogSettings { MemorySizeBits = 29 }, deviceType); - uContext.ResumeThread(); + uContext.BeginUnsafe(); try { @@ -706,7 +705,7 @@ public void ReadAtAddressReadFlagsNone() } finally { - uContext.SuspendThread(); + uContext.EndUnsafe(); } } } From faed0dc192dae27af50369563382707b9b82b942 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Fri, 23 Sep 2022 13:02:17 -0700 Subject: [PATCH 2/3] Add AcquireLockable check for in-prepare checkpoint --- cs/src/core/ClientSession/ClientSession.cs | 25 +++++++++++++++++++--- cs/src/core/Index/FASTER/FASTER.cs | 3 +++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 5cebbb86d..048558897 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -55,8 +55,21 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions> internal void AcquireLockable() { CheckIsNotAcquiredLockable(); - fht.IncrementNumLockingSessions(); - isAcquiredLockable = true; + + while (true) + { + // Checkpoints cannot complete while we have active locking sessions. + while (IsInPreparePhase()) + Thread.Yield(); + + fht.IncrementNumLockingSessions(); + isAcquiredLockable = true; + + if (!IsInPreparePhase()) + break; + InternalReleaseLockable(); + Thread.Yield(); + } } internal void ReleaseLockable(string methodName) @@ -64,6 +77,12 @@ internal void ReleaseLockable(string methodName) CheckIsAcquiredLockable(); if (TotalLockCount > 0) throw new FasterException($"{methodName} called with locks held: {sharedLockCount} shared locks, {exclusiveLockCount} exclusive locks"); + InternalReleaseLockable(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void InternalReleaseLockable() + { isAcquiredLockable = false; fht.DecrementNumLockingSessions(); } @@ -878,7 +897,7 @@ void IClientSession.AtomicSwitch(long version) /// <summary> /// Return true if Faster State Machine is in PREPARE sate /// </summary> - public bool IsInPreparePhase() + internal bool IsInPreparePhase() { return this.fht.SystemState.Phase == Phase.PREPARE; } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 77bcacf77..718c5c4a8 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -545,7 +545,10 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default } if (valueTasks.Count == 0) + { + // Note: The state machine will not advance as long as there are active locking sessions. continue; // we need to re-check loop, so we return only when we are at REST + } foreach (var task in valueTasks) { From 6b567ca71dd95c2de0482c55785e21ba112c0b1d Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli <badrishc@microsoft.com> Date: Fri, 23 Sep 2022 15:25:50 -0700 Subject: [PATCH 3/3] nits --- cs/src/core/ClientSession/ClientSession.cs | 4 ++-- cs/src/core/ClientSession/LockableContext.cs | 2 +- cs/src/core/ClientSession/LockableUnsafeContext.cs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 048558897..01ec4bcaa 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -72,11 +72,11 @@ internal void AcquireLockable() } } - internal void ReleaseLockable(string methodName) + internal void ReleaseLockable() { CheckIsAcquiredLockable(); if (TotalLockCount > 0) - throw new FasterException($"{methodName} called with locks held: {sharedLockCount} shared locks, {exclusiveLockCount} exclusive locks"); + throw new FasterException($"EndLockable called with locks held: {sharedLockCount} shared locks, {exclusiveLockCount} exclusive locks"); InternalReleaseLockable(); } diff --git a/cs/src/core/ClientSession/LockableContext.cs b/cs/src/core/ClientSession/LockableContext.cs index aa57c535c..ceb11643f 100644 --- a/cs/src/core/ClientSession/LockableContext.cs +++ b/cs/src/core/ClientSession/LockableContext.cs @@ -32,7 +32,7 @@ internal LockableContext(ClientSession<Key, Value, Input, Output, Context, Funct public void BeginLockable() => clientSession.AcquireLockable(); /// <inheritdoc/> - public void EndLockable() => clientSession.ReleaseLockable("LockableContext.EndLockable"); + public void EndLockable() => clientSession.ReleaseLockable(); #endregion Begin/EndLockable diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs index e676129ea..00b600ccf 100644 --- a/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -44,7 +44,7 @@ internal LockableUnsafeContext(ClientSession<Key, Value, Input, Output, Context, public void BeginLockable() => clientSession.AcquireLockable(); /// <inheritdoc/> - public void EndLockable() => clientSession.ReleaseLockable("LockableUnsafeContext.EndLockable"); + public void EndLockable() => clientSession.ReleaseLockable(); #endregion Begin/EndLockable #region Key Locking