diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 3ad942ffa..f4b30578f 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -225,8 +225,8 @@ public FasterKV(long size, LogSettings logSettings, Initialize(size, sectorSize); systemState = default; - systemState.phase = Phase.REST; - systemState.version = 1; + systemState.Phase = Phase.REST; + systemState.Version = 1; } /// @@ -404,6 +404,11 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp /// Checkpoint type /// For snapshot, try to store as incremental delta over last snapshot /// Cancellation token + /// + /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger + /// than current version. Actual new version may have version number greater than supplied number. If the supplied + /// number is -1, checkpoint will unconditionally create a new version. + /// /// /// (bool success, Guid token) /// success: Whether we successfully initiated the checkpoint (initiation may @@ -508,8 +513,8 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default while (true) { var systemState = this.systemState; - if (systemState.phase == Phase.REST || systemState.phase == Phase.PREPARE_GROW || - systemState.phase == Phase.IN_PROGRESS_GROW) + if (systemState.Phase == Phase.REST || systemState.Phase == Phase.PREPARE_GROW || + systemState.Phase == Phase.IN_PROGRESS_GROW) return; List valueTasks = new(); @@ -718,7 +723,7 @@ public bool GrowIndex() while (true) { SystemState _systemState = SystemState.Copy(ref systemState); - if (_systemState.phase == Phase.IN_PROGRESS_GROW) + if (_systemState.Phase == Phase.IN_PROGRESS_GROW) { SplitBuckets(0); epoch.ProtectAndDrain(); @@ -726,7 +731,7 @@ public bool GrowIndex() else { SystemState.RemoveIntermediate(ref _systemState); - if (_systemState.phase != Phase.PREPARE_GROW && _systemState.phase != Phase.IN_PROGRESS_GROW) + if (_systemState.Phase != Phase.PREPARE_GROW && _systemState.Phase != Phase.IN_PROGRESS_GROW) { return true; } @@ -752,7 +757,7 @@ public void Dispose() checkpointManager?.Dispose(); } - private void UpdateVarLen(ref VariableLengthStructSettings variableLengthStructSettings) + private static void UpdateVarLen(ref VariableLengthStructSettings variableLengthStructSettings) { if (typeof(Key) == typeof(SpanByte)) { diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 9fcf00371..2752cd1b3 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -1735,7 +1735,7 @@ private void HeavyEnter(long hash, Faster { // We spin-wait as a simplification // Could instead do a "heavy operation" here - while (systemState.phase != Phase.IN_PROGRESS_GROW) + while (systemState.Phase != Phase.IN_PROGRESS_GROW) Thread.SpinWait(100); InternalRefresh(ctx, session); } @@ -1954,7 +1954,7 @@ internal OperationStatus InternalTryCopyToTail(string guid, out F // We have recovered the corresponding session. // Now obtain the session by first locking the rest phase var currentState = SystemState.Copy(ref systemState); - if (currentState.phase == Phase.REST) + if (currentState.Phase == Phase.REST) { var intermediateState = SystemState.MakeIntermediate(currentState); if (MakeTransition(currentState, intermediateState)) @@ -70,7 +70,7 @@ internal void InternalRefresh(FasterExecu // We check if we are in normal mode var newPhaseInfo = SystemState.Copy(ref systemState); - if (ctx.phase == Phase.REST && newPhaseInfo.phase == Phase.REST && ctx.version == newPhaseInfo.version) + if (ctx.phase == Phase.REST && newPhaseInfo.Phase == Phase.REST && ctx.version == newPhaseInfo.Version) { return; } @@ -177,7 +177,7 @@ internal bool InternalCompletePending( } } - internal bool InRestPhase() => systemState.phase == Phase.REST; + internal bool InRestPhase() => systemState.Phase == Phase.REST; #region Complete Retry Requests internal void InternalCompleteRetryRequests( diff --git a/cs/src/core/Index/Recovery/DeltaLog.cs b/cs/src/core/Index/Recovery/DeltaLog.cs index 90d0d752a..60c4c2a2d 100644 --- a/cs/src/core/Index/Recovery/DeltaLog.cs +++ b/cs/src/core/Index/Recovery/DeltaLog.cs @@ -10,9 +10,20 @@ namespace FASTER.core { + /// + /// The type of a record in the delta (incremental) log + /// public enum DeltaLogEntryType : int { - DELTA, CHECKPOINT_METADATA + /// + /// The entry is a delta record + /// + DELTA, + + /// + /// The entry is checkpoint metadata + /// + CHECKPOINT_METADATA } [StructLayout(LayoutKind.Explicit, Size = DeltaLog.HeaderSize)] diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs index 2365e8616..a2ccfc506 100644 --- a/cs/src/core/Index/Recovery/ICheckpointManager.cs +++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs @@ -65,7 +65,6 @@ public interface ICheckpointManager : IDisposable /// /// /// - /// any additional (user-specified) information to persist with the checkpoint void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog); /// diff --git a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs index 43d4da866..079826f17 100644 --- a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs +++ b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs @@ -122,6 +122,8 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken) /// /// Token /// Delta log + /// whether or not to scan through the delta log to acquire latest entry + /// version upper bound to scan for in the delta log. Function will return the largest version metadata no greater than the given version. /// Metadata, or null if invalid public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo) { diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 9cbc15954..47a52d536 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -364,8 +364,8 @@ private bool RecoverToInitialPage(IndexCheckpointInfo recoveredICInfo, HybridLog currentSyncStateMachine = null; // Set new system state after recovery - systemState.phase = Phase.REST; - systemState.version = recoveredHLCInfo.info.version + 1; + systemState.Phase = Phase.REST; + systemState.Version = recoveredHLCInfo.info.version + 1; if (!recoveredICInfo.IsDefault() && recoveryCountdown != null) { diff --git a/cs/src/core/Index/Synchronization/FasterStateMachine.cs b/cs/src/core/Index/Synchronization/FasterStateMachine.cs index cbc430b44..663b15204 100644 --- a/cs/src/core/Index/Synchronization/FasterStateMachine.cs +++ b/cs/src/core/Index/Synchronization/FasterStateMachine.cs @@ -45,7 +45,7 @@ public partial class FasterKV /// /// Current version number of the store /// - public long CurrentVersion => systemState.version; + public long CurrentVersion => systemState.Version; /// /// Registers the given callback to be invoked for every state machine transition. Not safe to call with @@ -76,9 +76,9 @@ private bool StartStateMachine(ISynchronizationStateMachine stateMachine) [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool MakeTransition(SystemState expectedState, SystemState nextState) { - if (Interlocked.CompareExchange(ref systemState.word, nextState.word, expectedState.word) != - expectedState.word) return false; - Debug.WriteLine("Moved to {0}, {1}", nextState.phase, nextState.version); + if (Interlocked.CompareExchange(ref systemState.Word, nextState.Word, expectedState.Word) != + expectedState.Word) return false; + Debug.WriteLine("Moved to {0}, {1}", nextState.Phase, nextState.Version); return true; } @@ -109,7 +109,7 @@ internal void GlobalStateMachineStep(SystemState expectedState) currentSyncStateMachine.GlobalAfterEnteringState(nextState, this); // Mark the state machine done as we exit the state machine. - if (nextState.phase == Phase.REST) stateMachineActive = 0; + if (nextState.Phase == Phase.REST) stateMachineActive = 0; } @@ -117,9 +117,9 @@ internal void GlobalStateMachineStep(SystemState expectedState) [MethodImpl(MethodImplOptions.AggressiveInlining)] private SystemState StartOfCurrentCycle(SystemState currentGlobalState) { - return currentGlobalState.phase < Phase.REST - ? SystemState.Make(Phase.REST, currentGlobalState.version - 1) - : SystemState.Make(Phase.REST, currentGlobalState.version); + return currentGlobalState.Phase < Phase.REST + ? SystemState.Make(Phase.REST, currentGlobalState.Version - 1) + : SystemState.Make(Phase.REST, currentGlobalState.Version); } // Given the current thread state and global state, fast forward the thread state to the @@ -127,8 +127,8 @@ private SystemState StartOfCurrentCycle(SystemState currentGlobalState) [MethodImpl(MethodImplOptions.AggressiveInlining)] private SystemState FastForwardToCurrentCycle(SystemState threadState, SystemState targetStartState) { - if (threadState.version < targetStartState.version || - threadState.version == targetStartState.version && threadState.phase < targetStartState.phase) + if (threadState.Version < targetStartState.Version || + threadState.Version == targetStartState.Version && threadState.Phase < targetStartState.Phase) { return targetStartState; } @@ -148,7 +148,7 @@ internal bool SameCycle(FasterExecutionContext( #region Get returning thread to start of current cycle, issuing completion callbacks if needed if (ctx != null) { - if (ctx.version < targetStartState.version) + if (ctx.version < targetStartState.Version) { // Issue CPR callback for full session if (ctx.serialNum != -1) @@ -214,7 +214,7 @@ private void ThreadStateMachineStep( fasterSession?.CheckpointCompletionCallback(ctx.guid, commitPoint); } } - if ((ctx.version == targetStartState.version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine)) + if ((ctx.version == targetStartState.Version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine)) { IssueCompletionCallback(ctx, fasterSession); } @@ -223,12 +223,12 @@ private void ThreadStateMachineStep( // No state machine associated with target, or target is in REST phase: // we can directly fast forward session to target state - if (currentTask == null || targetState.phase == Phase.REST) + if (currentTask == null || targetState.Phase == Phase.REST) { if (ctx != null) { - ctx.phase = targetState.phase; - ctx.version = targetState.version; + ctx.phase = targetState.Phase; + ctx.version = targetState.Version; ctx.threadStateMachine = currentTask; } return; @@ -257,22 +257,22 @@ private void ThreadStateMachineStep( do { Debug.Assert( - (threadState.version < targetState.version) || - (threadState.version == targetState.version && - (threadState.phase <= targetState.phase || currentTask is IndexSnapshotStateMachine) + (threadState.Version < targetState.Version) || + (threadState.Version == targetState.Version && + (threadState.Phase <= targetState.Phase || currentTask is IndexSnapshotStateMachine) )); currentTask.OnThreadEnteringState(threadState, previousState, this, ctx, fasterSession, valueTasks, token); if (ctx != null) { - ctx.phase = threadState.phase; - ctx.version = threadState.version; + ctx.phase = threadState.Phase; + ctx.version = threadState.Version; } - previousState.word = threadState.word; + previousState.Word = threadState.Word; threadState = currentTask.NextState(threadState); - if (systemState.word != targetState.word) + if (systemState.Word != targetState.Word) { var tmp = SystemState.Copy(ref systemState); if (currentSyncStateMachine == currentTask) @@ -281,7 +281,7 @@ private void ThreadStateMachineStep( SystemState.RemoveIntermediate(ref targetState); } } - } while (previousState.word != targetState.word); + } while (previousState.Word != targetState.Word); #endregion return; diff --git a/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs index 9018655fb..942c7ad2e 100644 --- a/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs +++ b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs @@ -16,7 +16,7 @@ public void GlobalBeforeEnteringState( SystemState next, FasterKV faster) { - switch (next.phase) + switch (next.Phase) { case Phase.PREP_INDEX_CHECKPOINT: Debug.Assert(faster._indexCheckpoint.IsDefault() && @@ -25,7 +25,7 @@ public void GlobalBeforeEnteringState( faster._indexCheckpointToken = fullCheckpointToken; faster._hybridLogCheckpointToken = fullCheckpointToken; faster.InitializeIndexCheckpoint(faster._indexCheckpointToken); - faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version); + faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.Version); break; case Phase.WAIT_FLUSH: faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress(); @@ -79,19 +79,19 @@ public FullCheckpointStateMachine(ISynchronizationTask checkpointBackend, long t public override SystemState NextState(SystemState start) { var result = SystemState.Copy(ref start); - switch (start.phase) + switch (start.Phase) { case Phase.REST: - result.phase = Phase.PREP_INDEX_CHECKPOINT; + result.Phase = Phase.PREP_INDEX_CHECKPOINT; break; case Phase.PREP_INDEX_CHECKPOINT: - result.phase = Phase.PREPARE; + result.Phase = Phase.PREPARE; break; case Phase.WAIT_PENDING: - result.phase = Phase.WAIT_INDEX_CHECKPOINT; + result.Phase = Phase.WAIT_INDEX_CHECKPOINT; break; case Phase.WAIT_INDEX_CHECKPOINT: - result.phase = Phase.WAIT_FLUSH; + result.Phase = Phase.WAIT_FLUSH; break; default: result = base.NextState(start); diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs index 0460a2f8d..8e50fee92 100644 --- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -17,22 +17,22 @@ internal abstract class HybridLogCheckpointOrchestrationTask : ISynchronizationT public virtual void GlobalBeforeEnteringState(SystemState next, FasterKV faster) { - switch (next.phase) + switch (next.Phase) { case Phase.PREPARE: - lastVersion = faster.systemState.version; + lastVersion = faster.systemState.Version; if (faster._hybridLogCheckpoint.IsDefault()) { faster._hybridLogCheckpointToken = Guid.NewGuid(); - faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version); + faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.Version); } - faster._hybridLogCheckpoint.info.version = next.version; + faster._hybridLogCheckpoint.info.version = next.Version; faster.ObtainCurrentTailAddress(ref faster._hybridLogCheckpoint.info.startLogicalAddress); break; case Phase.WAIT_FLUSH: faster._hybridLogCheckpoint.info.headAddress = faster.hlog.HeadAddress; faster._hybridLogCheckpoint.info.beginAddress = faster.hlog.BeginAddress; - faster._hybridLogCheckpoint.info.nextVersion = next.version; + faster._hybridLogCheckpoint.info.nextVersion = next.Version; break; case Phase.PERSISTENCE_CALLBACK: CollectMetadata(next, faster); @@ -64,7 +64,7 @@ protected void CollectMetadata(SystemState next, FasterKV @@ -83,7 +83,7 @@ public virtual void OnThreadState(SystemState next, { base.GlobalBeforeEnteringState(next, faster); - if (next.phase == Phase.PREPARE) + if (next.Phase == Phase.PREPARE) { faster._lastSnapshotCheckpoint.Dispose(); } - if (next.phase != Phase.WAIT_FLUSH) return; + if (next.Phase != Phase.WAIT_FLUSH) return; faster.hlog.ShiftReadOnlyToTail(out var tailAddress, out faster._hybridLogCheckpoint.flushedSemaphore); @@ -137,7 +137,7 @@ public override void OnThreadState public override void GlobalBeforeEnteringState(SystemState next, FasterKV faster) { - switch (next.phase) + switch (next.Phase) { case Phase.PREPARE: faster._lastSnapshotCheckpoint.Dispose(); @@ -238,7 +238,7 @@ public override void OnThreadState public override void GlobalBeforeEnteringState(SystemState next, FasterKV faster) { - switch (next.phase) + switch (next.Phase) { case Phase.PREPARE: faster._hybridLogCheckpoint = faster._lastSnapshotCheckpoint; base.GlobalBeforeEnteringState(next, faster); faster._hybridLogCheckpoint.info.startLogicalAddress = faster.hlog.FlushedUntilAddress; - faster._hybridLogCheckpoint.prevVersion = next.version; + faster._hybridLogCheckpoint.prevVersion = next.Version; break; case Phase.WAIT_FLUSH: base.GlobalBeforeEnteringState(next, faster); @@ -327,7 +327,7 @@ public override void OnThreadState( SystemState next, FasterKV faster) { - switch (next.phase) + switch (next.Phase) { case Phase.PREPARE_GROW: // nothing to do @@ -45,7 +45,7 @@ public void GlobalAfterEnteringState( SystemState next, FasterKV faster) { - switch (next.phase) + switch (next.Phase) { case Phase.PREPARE_GROW: faster.epoch.BumpCurrentEpoch(() => faster.GlobalStateMachineStep(next)); @@ -70,7 +70,7 @@ public void OnThreadState( CancellationToken token = default) where FasterSession : IFasterSession { - switch (current.phase) + switch (current.Phase) { case Phase.PREPARE_GROW: case Phase.IN_PROGRESS_GROW: @@ -96,16 +96,16 @@ public IndexResizeStateMachine() : base(new IndexResizeTask()) {} public override SystemState NextState(SystemState start) { var nextState = SystemState.Copy(ref start); - switch (start.phase) + switch (start.Phase) { case Phase.REST: - nextState.phase = Phase.PREPARE_GROW; + nextState.Phase = Phase.PREPARE_GROW; break; case Phase.PREPARE_GROW: - nextState.phase = Phase.IN_PROGRESS_GROW; + nextState.Phase = Phase.IN_PROGRESS_GROW; break; case Phase.IN_PROGRESS_GROW: - nextState.phase = Phase.REST; + nextState.Phase = Phase.REST; break; default: throw new FasterException("Invalid Enum Argument"); diff --git a/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs b/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs index e4796d5f9..bc9f351ea 100644 --- a/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs +++ b/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs @@ -16,7 +16,7 @@ public void GlobalBeforeEnteringState( SystemState next, FasterKV faster) { - switch (next.phase) + switch (next.Phase) { case Phase.PREP_INDEX_CHECKPOINT: if (faster._indexCheckpoint.IsDefault()) @@ -67,7 +67,7 @@ public void OnThreadState( CancellationToken token = default) where FasterSession : IFasterSession { - switch (current.phase) + switch (current.Phase) { case Phase.PREP_INDEX_CHECKPOINT: faster.GlobalStateMachineStep(current); @@ -109,16 +109,16 @@ public IndexSnapshotStateMachine() : base(new IndexSnapshotTask()) public override SystemState NextState(SystemState start) { var result = SystemState.Copy(ref start); - switch (start.phase) + switch (start.Phase) { case Phase.REST: - result.phase = Phase.PREP_INDEX_CHECKPOINT; + result.Phase = Phase.PREP_INDEX_CHECKPOINT; break; case Phase.PREP_INDEX_CHECKPOINT: - result.phase = Phase.WAIT_INDEX_ONLY_CHECKPOINT; + result.Phase = Phase.WAIT_INDEX_ONLY_CHECKPOINT; break; case Phase.WAIT_INDEX_ONLY_CHECKPOINT: - result.phase = Phase.REST; + result.Phase = Phase.REST; break; default: throw new FasterException("Invalid Enum Argument"); diff --git a/cs/src/core/Index/Synchronization/StateTransitions.cs b/cs/src/core/Index/Synchronization/StateTransitions.cs index c683aec8f..ace3af510 100644 --- a/cs/src/core/Index/Synchronization/StateTransitions.cs +++ b/cs/src/core/Index/Synchronization/StateTransitions.cs @@ -21,83 +21,160 @@ internal struct ResizeInfo public long word; } + /// + /// The current phase of a state-machine operation such as a checkpoint + /// public enum Phase : int { - IN_PROGRESS, + /// In-progress phase, entering (v+1) version + IN_PROGRESS, + + /// Wait-pending phase, waiting for pending (v) operations to complete WAIT_PENDING, + + /// Wait for an index checkpoint to finish WAIT_INDEX_CHECKPOINT, + + /// Wait for data flush to complete WAIT_FLUSH, + + /// After flush has completed, write metadata to persistent storage and issue user callbacks PERSISTENCE_CALLBACK, + + /// The default phase; no state-machine operation is operating REST, + + /// Prepare for an index checkpoint PREP_INDEX_CHECKPOINT, + + /// Wait for an index-only checkpoint to complete WAIT_INDEX_ONLY_CHECKPOINT, + + /// Prepare for a checkpoint, still in (v) version PREPARE, - PREPARE_GROW, - IN_PROGRESS_GROW, + + /// Prepare to resize the index + PREPARE_GROW, + + /// Index resizing is in progress + IN_PROGRESS_GROW, + + /// Internal intermediate state of state machine INTERMEDIATE = 16, }; + /// + /// The current state of a state-machine operation such as a checkpoint. + /// [StructLayout(LayoutKind.Explicit, Size = 8)] public struct SystemState { + /// + /// The current of the operation + /// [FieldOffset(0)] - public Phase phase; + public Phase Phase; + /// + /// The version of the database when this operation is complete + /// [FieldOffset(4)] - public int version; + public int Version; + /// + /// The word containing information in bitfields + /// [FieldOffset(0)] - public long word; + internal long Word; + /// + /// Copy the into this + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static SystemState Copy(ref SystemState other) + internal static SystemState Copy(ref SystemState other) { var info = default(SystemState); - info.word = other.word; + info.Word = other.Word; return info; } + /// + /// Create a with the specified values + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static SystemState Make(Phase status, int version) + internal static SystemState Make(Phase status, int version) { var info = default(SystemState); - info.phase = status; - info.version = version; + info.Phase = status; + info.Version = version; return info; } + /// + /// Create a copy of the passed that is marked with the phase + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static SystemState MakeIntermediate(SystemState state) - => Make(state.phase | Phase.INTERMEDIATE, state.version); + internal static SystemState MakeIntermediate(SystemState state) + => Make(state.Phase | Phase.INTERMEDIATE, state.Version); + /// + /// Create a copy of the passed that is not marked with the phase + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void RemoveIntermediate(ref SystemState state) + internal static void RemoveIntermediate(ref SystemState state) { - state.phase &= ~Phase.INTERMEDIATE; + state.Phase &= ~Phase.INTERMEDIATE; } - public static bool Equal(SystemState s1, SystemState s2) + /// + /// Compare two s for equality + /// + internal static bool Equal(SystemState s1, SystemState s2) { - return s1.word == s2.word; + return s1.Word == s2.Word; } + /// public override string ToString() { - return $"[{phase},{version}]"; - } - - public bool Equals(SystemState other) - { - return word == other.word; + return $"[{Phase},{Version}]"; } + /// + /// Compare the current to for equality if obj is also a + /// public override bool Equals(object obj) { return obj is SystemState other && Equals(other); } + /// public override int GetHashCode() { - return word.GetHashCode(); + return Word.GetHashCode(); + } + + /// + /// Compare the current to for equality + /// + private bool Equals(SystemState other) + { + return Word == other.Word; + } + + /// + /// Equals + /// + public static bool operator ==(SystemState left, SystemState right) + { + return left.Equals(right); + } + + /// + /// Not Equals + /// + public static bool operator !=(SystemState left, SystemState right) + { + return !(left == right); } } } diff --git a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs index 1ee6ec921..9706d76a6 100644 --- a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs +++ b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs @@ -37,7 +37,7 @@ public void OnThreadState( CancellationToken token = default) where FasterSession : IFasterSession { - switch (current.phase) + switch (current.Phase) { case Phase.PREPARE: if (ctx != null) @@ -49,17 +49,17 @@ public void OnThreadState( ctx.markers[EpochPhaseIdx.Prepare] = true; } - faster.epoch.Mark(EpochPhaseIdx.Prepare, current.version); + faster.epoch.Mark(EpochPhaseIdx.Prepare, current.Version); } - if (faster.epoch.CheckIsComplete(EpochPhaseIdx.Prepare, current.version)) + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.Prepare, current.Version)) faster.GlobalStateMachineStep(current); break; case Phase.IN_PROGRESS: if (ctx != null) { // Need to be very careful here as threadCtx is changing - var _ctx = prev.phase == Phase.IN_PROGRESS ? ctx.prevCtx : ctx; + var _ctx = prev.Phase == Phase.IN_PROGRESS ? ctx.prevCtx : ctx; var tokens = faster._hybridLogCheckpoint.info.checkpointTokens; if (!faster.SameCycle(ctx, current) || tokens == null) return; @@ -73,11 +73,11 @@ public void OnThreadState( ctx.prevCtx.markers[EpochPhaseIdx.InProgress] = true; } - faster.epoch.Mark(EpochPhaseIdx.InProgress, current.version); + faster.epoch.Mark(EpochPhaseIdx.InProgress, current.Version); } // Has to be prevCtx, not ctx - if (faster.epoch.CheckIsComplete(EpochPhaseIdx.InProgress, current.version)) + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.InProgress, current.Version)) faster.GlobalStateMachineStep(current); break; case Phase.WAIT_PENDING: @@ -91,10 +91,10 @@ public void OnThreadState( break; } - faster.epoch.Mark(EpochPhaseIdx.WaitPending, current.version); + faster.epoch.Mark(EpochPhaseIdx.WaitPending, current.Version); } - if (faster.epoch.CheckIsComplete(EpochPhaseIdx.WaitPending, current.version)) + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.WaitPending, current.Version)) faster.GlobalStateMachineStep(current); break; case Phase.REST: @@ -114,7 +114,7 @@ public void GlobalBeforeEnteringState( SystemState next, FasterKV faster) { - if (next.phase == Phase.REST) + if (next.Phase == Phase.REST) // Before leaving the checkpoint, make sure all previous versions are read-only. faster.hlog.ShiftReadOnlyToTail(out _, out _); } @@ -166,30 +166,30 @@ protected VersionChangeStateMachine(long targetVersion = -1, params ISynchroniza public override SystemState NextState(SystemState start) { var nextState = SystemState.Copy(ref start); - switch (start.phase) + switch (start.Phase) { case Phase.REST: - nextState.phase = Phase.PREPARE; + nextState.Phase = Phase.PREPARE; break; case Phase.PREPARE: - nextState.phase = Phase.IN_PROGRESS; + nextState.Phase = Phase.IN_PROGRESS; // 13 bits of 1s --- FASTER records only store 13 bits of version number, and we need to ensure that // the next version is distinguishable from the last in those 13 bits. var bitMask = (1L << 13) - 1; // If they are not distinguishable, simply increment target version to resolve this - if (((targetVersion - start.version) & bitMask) == 0) + if (((targetVersion - start.Version) & bitMask) == 0) targetVersion++; // TODO: Move to long for system state as well. - SetToVersion(targetVersion == -1 ? start.version + 1 : targetVersion); - nextState.version = (int) ToVersion(); + SetToVersion(targetVersion == -1 ? start.Version + 1 : targetVersion); + nextState.Version = (int) ToVersion(); break; case Phase.IN_PROGRESS: // This phase has no effect if using relaxed CPR model - nextState.phase = Phase.WAIT_PENDING; + nextState.Phase = Phase.WAIT_PENDING; break; case Phase.WAIT_PENDING: - nextState.phase = Phase.REST; + nextState.Phase = Phase.REST; break; default: throw new FasterException("Invalid Enum Argument"); diff --git a/cs/test/StateMachineTests.cs b/cs/test/StateMachineTests.cs index a82f6c754..63315cec1 100644 --- a/cs/test/StateMachineTests.cs +++ b/cs/test/StateMachineTests.cs @@ -568,7 +568,7 @@ public override void ReadCompletionCallback(ref AdId key, ref NumClicks input, r public class TestCallback : IStateMachineCallback { - private HashSet invokedStates = new(); + private readonly HashSet invokedStates = new(); public void BeforeEnteringState(SystemState next, FasterKV faster) diff --git a/cs/test/VariableLengthStructFASTERTests.cs b/cs/test/VariableLengthStructFASTERTests.cs index 286d8ca1d..e51f3ec70 100644 --- a/cs/test/VariableLengthStructFASTERTests.cs +++ b/cs/test/VariableLengthStructFASTERTests.cs @@ -13,7 +13,8 @@ internal class VariableLengthStructFASTERTests // VarLenMax is the variable-length portion; 2 is for the fixed fields const int VarLenMax = 10; const int StackAllocMax = VarLenMax + 2; - int GetVarLen(Random r) => r.Next(VarLenMax) + 2; + + static int GetVarLen(Random r) => r.Next(VarLenMax) + 2; [Test] [Category("FasterKV")] @@ -31,7 +32,7 @@ public unsafe void VariableLengthTest1() var s = fht.NewSession(new VLFunctions()); Input input = default; - Random r = new Random(100); + Random r = new(100); // Single alloc outside the loop, to the max length we'll need. int* val = stackalloc int[StackAllocMax]; @@ -92,7 +93,7 @@ public unsafe void VariableLengthTest2() var s = fht.NewSession(new VLFunctions2()); Input input = default; - Random r = new Random(100); + Random r = new(100); // Single alloc outside the loop, to the max length we'll need. int* keyval = stackalloc int[StackAllocMax]; diff --git a/cs/test/WaitForCommit.cs b/cs/test/WaitForCommit.cs index 17650afec..1d507823d 100644 --- a/cs/test/WaitForCommit.cs +++ b/cs/test/WaitForCommit.cs @@ -14,8 +14,8 @@ internal class WaitForCommitTests public IDevice device; private string path; static readonly byte[] entry = new byte[10]; - static readonly AutoResetEvent ev = new AutoResetEvent(false); - static readonly AutoResetEvent done = new AutoResetEvent(false); + static readonly AutoResetEvent ev = new(false); + static readonly AutoResetEvent done = new(false); [SetUp] public void Setup() @@ -49,7 +49,7 @@ public void TearDown() [Category("Smoke")] public void WaitForCommitBasicTest(string SyncTest) { - CancellationTokenSource cts = new CancellationTokenSource(); + CancellationTokenSource cts = new(); CancellationToken token = cts.Token; // make it small since launching each on separate threads