diff --git a/cs/src/core/ClientSession/FASTERAsync.cs b/cs/src/core/ClientSession/FASTERAsync.cs index 6f72ba684..ee7a168fc 100644 --- a/cs/src/core/ClientSession/FASTERAsync.cs +++ b/cs/src/core/ClientSession/FASTERAsync.cs @@ -73,26 +73,6 @@ internal async ValueTask CompletePendingAsync(ClientSession clientSession) - { - // We check if we are in normal mode - var newPhaseInfo = SystemState.Copy(ref _systemState); - if (clientSession.ctx.phase == Phase.REST && newPhaseInfo.phase == Phase.REST && clientSession.ctx.version == newPhaseInfo.version) - { - return; - } - - // In non-checkpointing phases - if (newPhaseInfo.phase == Phase.PREPARE_GROW || newPhaseInfo.phase == Phase.IN_PROGRESS_GROW) - { - return; - } - - await HandleCheckpointingPhasesAsync(ctx, clientSession); - } - internal class ReadAsyncInternal { const int Completed = 1; @@ -258,7 +238,7 @@ static async ValueTask SlowReadAsync( } } - private bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext toCtx, int version) + internal bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext toCtx, int version) { lock (toCtx) { @@ -279,266 +259,5 @@ private bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext } return false; } - - private SystemState GetStartState(SystemState state) - { - if (state.phase <= Phase.REST) - return SystemState.Make(Phase.REST, state.version - 1); - else - return SystemState.Make(Phase.REST, state.version); - } - - private async ValueTask HandleCheckpointingPhasesAsync(FasterExecutionContext ctx, ClientSession clientSession, bool async = true, CancellationToken token = default) - { - if (async) - clientSession?.UnsafeResumeThread(); - - var finalState = SystemState.Copy(ref _systemState); - while (finalState.phase == Phase.INTERMEDIATE) - finalState = SystemState.Copy(ref _systemState); - - var previousState = ctx != null ? SystemState.Make(ctx.phase, ctx.version) : finalState; - - // We need to move from previousState to finalState one step at a time - var currentState = previousState; - - SystemState startState = GetStartState(finalState); - - if ((currentState.version < startState.version) || - (currentState.version == startState.version && currentState.phase < startState.phase)) - { - // Fast-forward to beginning of current checkpoint cycle - currentState = startState; - } - - do - { - switch (currentState.phase) - { - case Phase.PREP_INDEX_CHECKPOINT: - { - if (ctx != null) - { - if (!ctx.markers[EpochPhaseIdx.PrepareForIndexCheckpt]) - { - ctx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = true; - } - epoch.Mark(EpochPhaseIdx.PrepareForIndexCheckpt, currentState.version); - } - - if (epoch.CheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, currentState.version)) - { - GlobalMoveToNextCheckpointState(currentState); - } - break; - } - case Phase.INDEX_CHECKPOINT: - { - if (_checkpointType == CheckpointType.INDEX_ONLY && ctx != null) - { - // Reseting the marker for a potential FULL or INDEX_ONLY checkpoint in the future - ctx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = false; - } - - if (async && !IsIndexFuzzyCheckpointCompleted()) - { - clientSession?.UnsafeSuspendThread(); - await IsIndexFuzzyCheckpointCompletedAsync(token); - clientSession?.UnsafeResumeThread(); - } - GlobalMoveToNextCheckpointState(currentState); - - break; - } - case Phase.PREPARE: - { - if (ctx != null) - { - if (!ctx.markers[EpochPhaseIdx.Prepare]) - { - if (!RelaxedCPR) - { - AcquireSharedLatchesForAllPendingRequests(ctx); - } - ctx.markers[EpochPhaseIdx.Prepare] = true; - } - epoch.Mark(EpochPhaseIdx.Prepare, currentState.version); - } - - if (epoch.CheckIsComplete(EpochPhaseIdx.Prepare, currentState.version)) - { - GlobalMoveToNextCheckpointState(currentState); - } - - break; - } - case Phase.IN_PROGRESS: - { - if (ctx != null) - { - // Need to be very careful here as threadCtx is changing - FasterExecutionContext _ctx; - if (previousState.phase == Phase.IN_PROGRESS) - { - _ctx = ctx.prevCtx; - } - else - { - _ctx = ctx; - } - - if (!_ctx.markers[EpochPhaseIdx.InProgress]) - { - AtomicSwitch(ctx, ctx.prevCtx, _ctx.version); - InitContext(ctx, ctx.prevCtx.guid, ctx.prevCtx.serialNum); - - // Has to be prevCtx, not ctx - ctx.prevCtx.markers[EpochPhaseIdx.InProgress] = true; - } - - epoch.Mark(EpochPhaseIdx.InProgress, currentState.version); - } - - // Has to be prevCtx, not ctx - if (epoch.CheckIsComplete(EpochPhaseIdx.InProgress, currentState.version)) - { - GlobalMoveToNextCheckpointState(currentState); - } - break; - } - case Phase.WAIT_PENDING: - { - if (ctx != null) - { - if (!ctx.prevCtx.markers[EpochPhaseIdx.WaitPending]) - { - var notify = (ctx.prevCtx.HasNoPendingRequests); - - if (notify) - { - ctx.prevCtx.markers[EpochPhaseIdx.WaitPending] = true; - } - else - break; - } - epoch.Mark(EpochPhaseIdx.WaitPending, currentState.version); - } - - if (epoch.CheckIsComplete(EpochPhaseIdx.WaitPending, currentState.version)) - { - GlobalMoveToNextCheckpointState(currentState); - } - break; - } - case Phase.WAIT_FLUSH: - { - if (ctx == null || !ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush]) - { - bool notify; - - if (FoldOverSnapshot) - { - notify = (hlog.FlushedUntilAddress >= _hybridLogCheckpoint.info.finalLogicalAddress); - } - else - { - notify = (_hybridLogCheckpoint.flushedSemaphore != null) && _hybridLogCheckpoint.flushedSemaphore.CurrentCount > 0; - } - - if (async && !notify) - { - Debug.Assert(_hybridLogCheckpoint.flushedSemaphore != null); - clientSession?.UnsafeSuspendThread(); - await _hybridLogCheckpoint.flushedSemaphore.WaitAsync(token); - clientSession?.UnsafeResumeThread(); - - _hybridLogCheckpoint.flushedSemaphore.Release(); - - notify = true; - } - - if (_checkpointType == CheckpointType.FULL) - { - notify = notify && IsIndexFuzzyCheckpointCompleted(); - - if (async && !notify) - { - clientSession?.UnsafeSuspendThread(); - await IsIndexFuzzyCheckpointCompletedAsync(token); - clientSession?.UnsafeResumeThread(); - - notify = true; - } - } - - if (notify) - { - if (ctx != null) - ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush] = true; - } - else - break; - } - - if (ctx != null) - epoch.Mark(EpochPhaseIdx.WaitFlush, currentState.version); - - if (epoch.CheckIsComplete(EpochPhaseIdx.WaitFlush, currentState.version)) - { - GlobalMoveToNextCheckpointState(currentState); - } - break; - } - - case Phase.PERSISTENCE_CALLBACK: - { - if (ctx != null) - { - if (!ctx.prevCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback]) - { - if (ctx.prevCtx.serialNum != -1) - { - var commitPoint = new CommitPoint - { - UntilSerialNo = ctx.prevCtx.serialNum, - ExcludedSerialNos = ctx.prevCtx.excludedSerialNos - }; - - // Thread local action - functions.CheckpointCompletionCallback(ctx.guid, commitPoint); - if (clientSession != null) - clientSession.LatestCommitPoint = commitPoint; - } - ctx.prevCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true; - } - epoch.Mark(EpochPhaseIdx.CheckpointCompletionCallback, currentState.version); - } - if (epoch.CheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, currentState.version)) - { - GlobalMoveToNextCheckpointState(currentState); - } - break; - } - case Phase.REST: - { - break; - } - default: - throw new FasterException("Invalid state found during checkpointing"); - } - - if (ctx != null) - { - // update thread local variables - ctx.phase = currentState.phase; - ctx.version = currentState.version; - } - previousState.word = currentState.word; - currentState = GetNextState(currentState, _checkpointType); - } while (previousState.word != finalState.word); - - if (async) - clientSession?.UnsafeSuspendThread(); - } } } diff --git a/cs/src/core/ClientSession/FASTERClientSession.cs b/cs/src/core/ClientSession/FASTERClientSession.cs index cebbe8288..f98208a6a 100644 --- a/cs/src/core/ClientSession/FASTERClientSession.cs +++ b/cs/src/core/ClientSession/FASTERClientSession.cs @@ -16,7 +16,7 @@ public unsafe partial class FasterKV { - private Dictionary> _activeSessions; + internal Dictionary> _activeSessions; /// /// Start a new client session with FASTER. diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 53769f9f1..3cf554a26 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -34,10 +34,13 @@ internal enum OperationStatus internal class SerializedFasterExecutionContext { - public int version; - public long serialNum; - public string guid; + internal int version; + internal long serialNum; + internal string guid; + /// + /// + /// public void Write(StreamWriter writer) { writer.WriteLine(version); @@ -45,6 +48,9 @@ public void Write(StreamWriter writer) writer.WriteLine(serialNum); } + /// + /// + /// public void Load(StreamReader reader) { string value = reader.ReadLine(); @@ -56,7 +62,7 @@ public void Load(StreamReader reader) } } - public unsafe partial class FasterKV : FasterBase, IFasterKV + public partial class FasterKV : FasterBase, IFasterKV where Key : new() where Value : new() where Functions : IFunctions @@ -65,26 +71,20 @@ public unsafe partial class FasterKV key; - public IHeapContainer value; - public Input input; - public Output output; - public Context userContext; + internal IHeapContainer key; + internal IHeapContainer value; + internal Input input; + internal Output output; + internal Context userContext; // Some additional information about the previous attempt - - public long id; - - public int version; - - public long logicalAddress; - - public long serialNum; - - public HashBucketEntry entry; + internal long id; + internal int version; + internal long logicalAddress; + internal long serialNum; + internal HashBucketEntry entry; public void Dispose() { diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 6ad7ce1cf..4b940e41e 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -1,34 +1,32 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. - #pragma warning disable 0162 using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace FASTER.core { - public partial class FasterKV : FasterBase, IFasterKV + public partial class FasterKV : FasterBase, + IFasterKV where Key : new() where Value : new() where Functions : IFunctions { - private readonly Functions functions; - private readonly AllocatorBase hlog; + internal readonly Functions functions; + internal readonly AllocatorBase hlog; private readonly AllocatorBase readcache; private readonly IFasterEqualityComparer comparer; - private readonly bool UseReadCache = false; - private readonly bool CopyReadsToTail = false; - private readonly bool FoldOverSnapshot = false; - private readonly int sectorSize; - private readonly bool WriteDefaultOnDelete = false; - private bool RelaxedCPR = false; + internal readonly bool UseReadCache; + private readonly bool CopyReadsToTail; + private readonly bool FoldOverSnapshot; + internal readonly int sectorSize; + private readonly bool WriteDefaultOnDelete; + internal bool RelaxedCPR; /// /// Use relaxed version of CPR, where ops pending I/O @@ -62,20 +60,8 @@ public partial class FasterKV : F /// Read cache used by this FASTER instance /// public LogAccessor ReadCache { get; } - - private enum CheckpointType - { - INDEX_ONLY, - HYBRID_LOG_ONLY, - FULL - } - - private CheckpointType _checkpointType; - private Guid _indexCheckpointToken; - private Guid _hybridLogCheckpointToken; - private SystemState _systemState; - private HybridLogCheckpointInfo _hybridLogCheckpoint; - private ConcurrentDictionary _recoveredSessions; + + internal ConcurrentDictionary _recoveredSessions; /// /// Create FASTER instance @@ -87,7 +73,10 @@ private enum CheckpointType /// Log settings /// Checkpoint settings /// Serializer settings - public FasterKV(long size, Functions functions, LogSettings logSettings, CheckpointSettings checkpointSettings = null, SerializerSettings serializerSettings = null, IFasterEqualityComparer comparer = null, VariableLengthStructSettings variableLengthStructSettings = null) + public FasterKV(long size, Functions functions, LogSettings logSettings, + CheckpointSettings checkpointSettings = null, SerializerSettings serializerSettings = null, + IFasterEqualityComparer comparer = null, + VariableLengthStructSettings variableLengthStructSettings = null) { if (comparer != null) this.comparer = comparer; @@ -99,7 +88,8 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo } else { - Console.WriteLine("***WARNING*** Creating default FASTER key equality comparer based on potentially slow EqualityComparer.Default. To avoid this, provide a comparer (IFasterEqualityComparer) as an argument to FASTER's constructor, or make Key implement the interface IFasterEqualityComparer"); + Console.WriteLine( + "***WARNING*** Creating default FASTER key equality comparer based on potentially slow EqualityComparer.Default. To avoid this, provide a comparer (IFasterEqualityComparer) as an argument to FASTER's constructor, or make Key implement the interface IFasterEqualityComparer"); this.comparer = FasterEqualityComparer.Default; } } @@ -108,9 +98,11 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo checkpointSettings = new CheckpointSettings(); if (checkpointSettings.CheckpointDir != null && checkpointSettings.CheckpointManager != null) - throw new FasterException("Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both"); + throw new FasterException( + "Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both"); - checkpointManager = checkpointSettings.CheckpointManager ?? new LocalCheckpointManager(checkpointSettings.CheckpointDir ?? ""); + checkpointManager = checkpointSettings.CheckpointManager ?? + new LocalCheckpointManager(checkpointSettings.CheckpointDir ?? ""); FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; CopyReadsToTail = logSettings.CopyReadsToTail; @@ -126,7 +118,8 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo { if (variableLengthStructSettings != null) { - hlog = new VariableLengthBlittableAllocator(logSettings, variableLengthStructSettings, this.comparer, null, epoch); + hlog = new VariableLengthBlittableAllocator(logSettings, variableLengthStructSettings, + this.comparer, null, epoch); Log = new LogAccessor(this, hlog); if (UseReadCache) { @@ -184,36 +177,35 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo hlog.Initialize(); - sectorSize = (int)logSettings.LogDevice.SectorSize; + sectorSize = (int) logSettings.LogDevice.SectorSize; Initialize(size, sectorSize); _systemState = default; _systemState.phase = Phase.REST; _systemState.version = 1; - _checkpointType = CheckpointType.HYBRID_LOG_ONLY; } /// /// Initiate full checkpoint /// /// Checkpoint token + /// upper limit (inclusive) of the version included /// /// Whether we successfully initiated the checkpoint (initiation may /// fail if we are already taking a checkpoint or performing some other /// operation such as growing the index). /// - public bool TakeFullCheckpoint(out Guid token) + public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1) { - if (InternalTakeCheckpoint(CheckpointType.FULL)) - { - token = _indexCheckpointToken; - return true; - } + ISynchronizationTask backend; + if (FoldOverSnapshot) + backend = new FoldOverCheckpointTask(); else - { - token = default; - return false; - } + backend = new SnapshotCheckpointTask(); + + var result = StartStateMachine(new FullCheckpointStateMachine(backend, targetVersion)); + token = _hybridLogCheckpointToken; + return result; } /// @@ -223,35 +215,28 @@ public bool TakeFullCheckpoint(out Guid token) /// Whether we could initiate the checkpoint public bool TakeIndexCheckpoint(out Guid token) { - if (InternalTakeCheckpoint(CheckpointType.INDEX_ONLY)) - { - token = _indexCheckpointToken; - return true; - } - else - { - token = default; - return false; - } + var result = StartStateMachine(new IndexSnapshotStateMachine()); + token = _indexCheckpointToken; + return result; } /// /// Take hybrid log checkpoint /// /// Checkpoint token + /// upper limit (inclusive) of the version included /// Whether we could initiate the checkpoint - public bool TakeHybridLogCheckpoint(out Guid token) + public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1) { - if (InternalTakeCheckpoint(CheckpointType.HYBRID_LOG_ONLY)) - { - token = _hybridLogCheckpointToken; - return true; - } + ISynchronizationTask backend; + if (FoldOverSnapshot) + backend = new FoldOverCheckpointTask(); else - { - token = default; - return false; - } + backend = new SnapshotCheckpointTask(); + + var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion)); + token = _hybridLogCheckpointToken; + return result; } /// @@ -295,35 +280,39 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default while (true) { var systemState = _systemState; - if (systemState.phase == Phase.REST || systemState.phase == Phase.PREPARE_GROW || systemState.phase == Phase.IN_PROGRESS_GROW) + if (systemState.phase == Phase.REST || systemState.phase == Phase.PREPARE_GROW || + systemState.phase == Phase.IN_PROGRESS_GROW) return; - await HandleCheckpointingPhasesAsync(null, null); + await ThreadStateMachineStep(null, null, true, token); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status ContextRead(ref Key key, ref Input input, ref Output output, Context context, long serialNo, FasterExecutionContext sessionCtx) + internal Status ContextRead(ref Key key, ref Input input, ref Output output, Context context, long serialNo, + FasterExecutionContext sessionCtx) { var pcontext = default(PendingContext); - var internalStatus = InternalRead(ref key, ref input, ref output, ref context, ref pcontext, sessionCtx, serialNo); + var internalStatus = InternalRead(ref key, ref input, ref output, ref context, ref pcontext, sessionCtx, + serialNo); Status status; if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) { - status = (Status)internalStatus; + status = (Status) internalStatus; } else { status = HandleOperationStatus(sessionCtx, sessionCtx, pcontext, internalStatus); } + sessionCtx.serialNum = serialNo; return status; } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status ContextUpsert(ref Key key, ref Value value, Context context, long serialNo, FasterExecutionContext sessionCtx) + internal Status ContextUpsert(ref Key key, ref Value value, Context context, long serialNo, + FasterExecutionContext sessionCtx) { var pcontext = default(PendingContext); var internalStatus = InternalUpsert(ref key, ref value, ref context, ref pcontext, sessionCtx, serialNo); @@ -331,30 +320,33 @@ internal Status ContextUpsert(ref Key key, ref Value value, Context context, lon if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) { - status = (Status)internalStatus; + status = (Status) internalStatus; } else { status = HandleOperationStatus(sessionCtx, sessionCtx, pcontext, internalStatus); } + sessionCtx.serialNum = serialNo; return status; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status ContextRMW(ref Key key, ref Input input, Context context, long serialNo, FasterExecutionContext sessionCtx) + internal Status ContextRMW(ref Key key, ref Input input, Context context, long serialNo, + FasterExecutionContext sessionCtx) { var pcontext = default(PendingContext); var internalStatus = InternalRMW(ref key, ref input, ref context, ref pcontext, sessionCtx, serialNo); Status status; if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) { - status = (Status)internalStatus; + status = (Status) internalStatus; } else { status = HandleOperationStatus(sessionCtx, sessionCtx, pcontext, internalStatus); } + sessionCtx.serialNum = serialNo; return status; } @@ -367,8 +359,9 @@ internal Status ContextDelete(ref Key key, Context context, long serialNo, Faste var status = default(Status); if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) { - status = (Status)internalStatus; + status = (Status) internalStatus; } + sessionCtx.serialNum = serialNo; return status; } @@ -380,7 +373,7 @@ internal Status ContextDelete(ref Key key, Context context, long serialNo, Faste /// Whether the request succeeded public bool GrowIndex() { - return InternalGrowIndex(); + return StartStateMachine(new IndexResizeStateMachine()); } /// @@ -394,4 +387,4 @@ public void Dispose() readcache?.Dispose(); } } -} +} \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index 7b35615a9..1c571b898 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -143,7 +143,6 @@ internal struct HashBucketEntry { [FieldOffset(0)] public long word; - public long Address { get @@ -158,7 +157,6 @@ public long Address } } - public ushort Tag { get @@ -326,7 +324,7 @@ public void Initialize(long size, int sector_size) /// /// /// - protected void Initialize(int version, long size, int sector_size) + internal void Initialize(int version, long size, int sector_size) { long size_bytes = size * sizeof(HashBucket); long aligned_size_bytes = sector_size + diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 0fa0cc9fd..2bd7f6858 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -1991,9 +1991,7 @@ private void SplitBuckets(long hash) { // GC old version of hash table state[1 - resizeInfo.version] = default; - - long context = 0; - GlobalMoveToNextState(_systemState, SystemState.Make(Phase.REST, _systemState.version), ref context); + GlobalStateMachineStep(_systemState); return; } break; diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index ed2fa1d5c..649596894 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -79,14 +79,11 @@ internal void InternalRefresh(FasterExecutionContext ctx, ClientSession : IDis /// Take full checkpoint of FASTER /// /// Token describing checkpoint + /// upper limit (inclusive) of the version included /// Whether checkpoint was initiated - bool TakeFullCheckpoint(out Guid token); + bool TakeFullCheckpoint(out Guid token, long targetVersion = -1); /// /// Take checkpoint of FASTER index only (not log) @@ -156,8 +157,9 @@ public interface IFasterKV : IDis /// Take checkpoint of FASTER log only (not index) /// /// Token describing checkpoin + /// upper limit (inclusive) of the version included /// Whether checkpoint was initiated - bool TakeHybridLogCheckpoint(out Guid token); + bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1); /// /// Recover from last successfuly checkpoints diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs index 72f905cff..7f28bcdd9 100644 --- a/cs/src/core/Index/Recovery/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -27,516 +27,78 @@ public struct LinkedCheckpointInfo /// public Task NextTask; } - - public partial class FasterKV : FasterBase, IFasterKV - where Key : new() - where Value : new() - where Functions : IFunctions + + internal class EpochPhaseIdx { + public const int PrepareForIndexCheckpt = 0; - private TaskCompletionSource checkpointTcs - = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - internal Task CheckpointTask => checkpointTcs.Task; - - private class EpochPhaseIdx - { - public const int PrepareForIndexCheckpt = 0; - - public const int Prepare = 1; - - public const int InProgress = 2; - - public const int WaitPending = 3; - - public const int WaitFlush = 4; - - public const int CheckpointCompletionCallback = 5; - } - - #region Starting points - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool InternalTakeCheckpoint(CheckpointType type) - { - if (_systemState.phase == Phase.REST) - { - var context = (long)type; - var currentState = SystemState.Make(Phase.REST, _systemState.version); - var nextState = GetNextState(currentState, type); - return GlobalMoveToNextState(currentState, nextState, ref context); - } - else - { - return false; - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool InternalGrowIndex() - { - if (_systemState.phase == Phase.REST) - { - var version = _systemState.version; - long context = 0; - SystemState nextState = SystemState.Make(Phase.PREPARE_GROW, version); - if (GlobalMoveToNextState(SystemState.Make(Phase.REST, version), nextState, ref context)) - { - return true; - } - } - - return false; - } - #endregion - - /// - /// Global transition function that coordinates various state machines. - /// A few characteristics about the state machine: - /// - /// - /// - /// Transitions happen atomically using a compare-and-swap operation. So, multiple threads can try to do the same transition. Only one will succeed. - /// - /// - /// - /// - /// Transition from state A to B happens via an intermediate state (INTERMEDIATE). This serves as a lock by a thread to perform the transition. - /// Some transitions are accompanied by actions that must be performed before the transitions such as initializing contexts, etc. - /// - /// - /// - /// - /// States can be part of multiple state machines. For example: PREP_INDEX_CHECKPOINT is part of both index-only and full checkpoints. - /// - /// - /// - /// - /// We currently support 4 different state machines: - /// - /// - /// Index-Only Checkpoint - /// REST -> PREP_INDEX_CHECKPOINT -> INDEX_CHECKPOINT -> REST - /// - /// - /// HybridLog-Only Checkpoint - /// REST -> PREPARE -> IN_PROGRESS -> WAIT_PENDING -> WAIT_FLUSH -> PERSISTENCE_CALLBACK -> REST - /// - /// - /// Full Checkpoint - /// REST -> PREP_INDEX_CHECKPOINT -> PREPARE -> IN_PROGRESS -> WAIT_PENDING -> WAIT_FLUSH -> PERSISTENCE_CALLBACK -> REST - /// - /// - /// Grow - /// - /// - /// - /// - /// from state of the transition. - /// to state of the transition. - /// optional additioanl parameter for transition. - /// true if transition succeeds. - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool GlobalMoveToNextState(SystemState currentState, SystemState nextState, ref long context) - { - var intermediateState = SystemState.Make(Phase.INTERMEDIATE, currentState.version); - - // Move from S1 to I - if (MakeTransition(currentState, intermediateState)) - { - // Acquired ownership to make the transition from S1 to S2 - switch (nextState.phase) - { - case Phase.PREP_INDEX_CHECKPOINT: - { - _checkpointType = (CheckpointType)context; - - switch (_checkpointType) - { - case CheckpointType.INDEX_ONLY: - { - _indexCheckpointToken = Guid.NewGuid(); - InitializeIndexCheckpoint(_indexCheckpointToken); - break; - } - case CheckpointType.FULL: - { - var fullCheckpointToken = Guid.NewGuid(); - _indexCheckpointToken = fullCheckpointToken; - _hybridLogCheckpointToken = fullCheckpointToken; - InitializeIndexCheckpoint(_indexCheckpointToken); - InitializeHybridLogCheckpoint(_hybridLogCheckpointToken, currentState.version); - break; - } - default: - throw new FasterException(); - } - - ObtainCurrentTailAddress(ref _indexCheckpoint.info.startLogicalAddress); - - MakeTransition(intermediateState, nextState); - break; - } - case Phase.INDEX_CHECKPOINT: - { - if (UseReadCache && this.ReadCache.BeginAddress != this.ReadCache.TailAddress) - { - throw new FasterException("Index checkpoint with read cache is not supported"); - } - TakeIndexFuzzyCheckpoint(); - - MakeTransition(intermediateState, nextState); - break; - } - case Phase.PREPARE: - { - switch (currentState.phase) - { - case Phase.REST: - { - _checkpointType = (CheckpointType)context; - - Debug.Assert(_checkpointType == CheckpointType.HYBRID_LOG_ONLY); - _hybridLogCheckpointToken = Guid.NewGuid(); - InitializeHybridLogCheckpoint(_hybridLogCheckpointToken, currentState.version); - break; - } - case Phase.PREP_INDEX_CHECKPOINT: - { - if (UseReadCache && this.ReadCache.BeginAddress != this.ReadCache.TailAddress) - { - throw new FasterException("Index checkpoint with read cache is not supported"); - } - TakeIndexFuzzyCheckpoint(); - break; - } - default: - throw new FasterException(); - } - - ObtainCurrentTailAddress(ref _hybridLogCheckpoint.info.startLogicalAddress); - - if (!FoldOverSnapshot) - { - _hybridLogCheckpoint.info.flushedLogicalAddress = hlog.FlushedUntilAddress; - _hybridLogCheckpoint.info.useSnapshotFile = 1; - } - - MakeTransition(intermediateState, nextState); - break; - } - case Phase.IN_PROGRESS: - { - MakeTransition(intermediateState, nextState); - break; - } - case Phase.WAIT_PENDING: - { - MakeTransition(intermediateState, nextState); - break; - } - case Phase.WAIT_FLUSH: - { - if (_checkpointType == CheckpointType.FULL) - { - _indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress(); - ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress); - } - - _hybridLogCheckpoint.info.headAddress = hlog.HeadAddress; - _hybridLogCheckpoint.info.beginAddress = hlog.BeginAddress; + public const int Prepare = 1; - if (FoldOverSnapshot) - { - hlog.ShiftReadOnlyToTail(out long tailAddress, out _hybridLogCheckpoint.flushedSemaphore); - _hybridLogCheckpoint.info.finalLogicalAddress = tailAddress; - } - else - { - ObtainCurrentTailAddress(ref _hybridLogCheckpoint.info.finalLogicalAddress); + public const int InProgress = 2; - _hybridLogCheckpoint.snapshotFileDevice = checkpointManager.GetSnapshotLogDevice(_hybridLogCheckpointToken); - _hybridLogCheckpoint.snapshotFileObjectLogDevice = checkpointManager.GetSnapshotObjectLogDevice(_hybridLogCheckpointToken); - _hybridLogCheckpoint.snapshotFileDevice.Initialize(hlog.GetSegmentSize()); - _hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(-1); + public const int WaitPending = 3; - long startPage = hlog.GetPage(_hybridLogCheckpoint.info.flushedLogicalAddress); - long endPage = hlog.GetPage(_hybridLogCheckpoint.info.finalLogicalAddress); - if (_hybridLogCheckpoint.info.finalLogicalAddress > hlog.GetStartLogicalAddress(endPage)) - { - endPage++; - } + public const int WaitFlush = 4; - // This can be run on a new thread if we want to immediately parallelize - // the rest of the log flush - hlog.AsyncFlushPagesToDevice( - startPage, - endPage, - _hybridLogCheckpoint.info.finalLogicalAddress, - _hybridLogCheckpoint.snapshotFileDevice, - _hybridLogCheckpoint.snapshotFileObjectLogDevice, - out _hybridLogCheckpoint.flushedSemaphore); - } - - - MakeTransition(intermediateState, nextState); - break; - } - case Phase.PERSISTENCE_CALLBACK: - { - // Collect object log offsets only after flushes - // are completed - var seg = hlog.GetSegmentOffsets(); - if (seg != null) - { - _hybridLogCheckpoint.info.objectLogSegmentOffsets = new long[seg.Length]; - Array.Copy(seg, _hybridLogCheckpoint.info.objectLogSegmentOffsets, seg.Length); - } - - if (_activeSessions != null) - { - // write dormant sessions to checkpoint - foreach (var kvp in _activeSessions) - { - AtomicSwitch(kvp.Value.ctx, kvp.Value.ctx.prevCtx, currentState.version - 1); - } - } - WriteHybridLogMetaInfo(); - - if (_checkpointType == CheckpointType.FULL) - WriteIndexMetaInfo(); - - MakeTransition(intermediateState, nextState); - break; - } - case Phase.PREPARE_GROW: - { - // Note that the transition must be done before bumping epoch here! - MakeTransition(intermediateState, nextState); - epoch.BumpCurrentEpoch(() => - { - long _context = 0; - GlobalMoveToNextState(nextState, SystemState.Make(Phase.IN_PROGRESS_GROW, nextState.version), ref _context); - }); - break; - } - case Phase.IN_PROGRESS_GROW: - { - // Set up the transition to new version of HT - int numChunks = (int)(state[resizeInfo.version].size / Constants.kSizeofChunk); - if (numChunks == 0) numChunks = 1; // at least one chunk - - numPendingChunksToBeSplit = numChunks; - splitStatus = new long[numChunks]; - - Initialize(1 - resizeInfo.version, state[resizeInfo.version].size * 2, sectorSize); - - resizeInfo.version = 1 - resizeInfo.version; - - MakeTransition(intermediateState, nextState); - break; - } - case Phase.REST: - { - switch (_checkpointType) - { - case CheckpointType.INDEX_ONLY: - { - _indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress(); - ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress); - WriteIndexMetaInfo(); - _indexCheckpoint.Reset(); - break; - } - case CheckpointType.FULL: - { - _indexCheckpoint.Reset(); - _hybridLogCheckpoint.Reset(); - break; - } - case CheckpointType.HYBRID_LOG_ONLY: - { - _hybridLogCheckpoint.Reset(); - break; - } - default: - throw new FasterException(); - } - - var nextTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - checkpointTcs.SetResult(new LinkedCheckpointInfo { NextTask = nextTcs.Task }); - checkpointTcs = nextTcs; - - MakeTransition(intermediateState, nextState); - break; - } - } - return true; - } - else - { - return false; - } - } - - /// - /// Corresponding thread-local actions that must be performed when any state machine is active - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void HandleCheckpointingPhases(FasterExecutionContext ctx, ClientSession clientSession) - { - var _ = HandleCheckpointingPhasesAsync(ctx, clientSession, false); - return; - } + public const int CheckpointCompletionCallback = 5; + } - #region Helper functions - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool GlobalMoveToNextCheckpointState(SystemState currentState) - { - long context = 0; - return GlobalMoveToNextState(currentState, GetNextState(currentState, _checkpointType), ref context); - } + public partial class FasterKV + where Key : new() + where Value : new() + where Functions : IFunctions + { + + internal TaskCompletionSource checkpointTcs + = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + internal Guid _indexCheckpointToken; + internal Guid _hybridLogCheckpointToken; + internal HybridLogCheckpointInfo _hybridLogCheckpoint; - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool MakeTransition(SystemState currentState, SystemState nextState) - { - // Move from I to P2 - if (Interlocked.CompareExchange(ref _systemState.word, nextState.word, currentState.word) == currentState.word) - { - Debug.WriteLine("Moved to {0}, {1}", nextState.phase, nextState.version); - return true; - } - else - { - return false; - } - } + internal Task CheckpointTask => checkpointTcs.Task; - private void AcquireSharedLatchesForAllPendingRequests(FasterExecutionContext ctx) + internal void AcquireSharedLatchesForAllPendingRequests(FasterExecutionContext ctx) { foreach (var _ctx in ctx.retryRequests) { AcquireSharedLatch(_ctx.key.Get()); } + foreach (var _ctx in ctx.ioPendingRequests.Values) { AcquireSharedLatch(_ctx.key.Get()); } } + - /* - * We have several state machines supported by this function. - * Full Checkpoint: - * REST -> PREP_INDEX_CHECKPOINT -> PREPARE -> IN_PROGRESS - * -> WAIT_PENDING -> WAIT_FLUSH -> PERSISTENCE_CALLBACK -> REST - * - * Index Checkpoint: - * REST -> PREP_INDEX_CHECKPOINT -> INDEX_CHECKPOINT -> REST - * - * Hybrid Log Checkpoint: - * REST -> PREPARE -> IN_PROGRESS -> WAIT_PENDING -> WAIT_FLUSH -> - * -> PERSISTENCE_CALLBACK -> REST - * - * Grow : - * REST -> PREPARE_GROW -> IN_PROGRESS_GROW -> REST - * - * GC: - * REST -> GC -> REST - */ - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private SystemState GetNextState(SystemState start, CheckpointType type = CheckpointType.FULL) - { - - var nextState = default(SystemState); - nextState.word = start.word; - switch (start.phase) - { - case Phase.REST: - switch (type) - { - case CheckpointType.HYBRID_LOG_ONLY: - nextState.phase = Phase.PREPARE; - break; - case CheckpointType.FULL: - case CheckpointType.INDEX_ONLY: - nextState.phase = Phase.PREP_INDEX_CHECKPOINT; - break; - } - break; - case Phase.PREP_INDEX_CHECKPOINT: - switch (type) - { - case CheckpointType.INDEX_ONLY: - nextState.phase = Phase.INDEX_CHECKPOINT; - break; - case CheckpointType.FULL: - nextState.phase = Phase.PREPARE; - break; - } - break; - case Phase.INDEX_CHECKPOINT: - switch(type) - { - case CheckpointType.FULL: - nextState.phase = Phase.PREPARE; - break; - default: - nextState.phase = Phase.REST; - nextState.version = start.version + 1; - break; - } - break; - case Phase.PREPARE: - nextState.phase = Phase.IN_PROGRESS; - nextState.version = start.version + 1; - break; - case Phase.IN_PROGRESS: - nextState.phase = RelaxedCPR ? Phase.WAIT_FLUSH : Phase.WAIT_PENDING; - break; - case Phase.WAIT_PENDING: - nextState.phase = Phase.WAIT_FLUSH; - break; - case Phase.WAIT_FLUSH: - nextState.phase = Phase.PERSISTENCE_CALLBACK; - break; - case Phase.PERSISTENCE_CALLBACK: - nextState.phase = Phase.REST; - break; - case Phase.PREPARE_GROW: - nextState.phase = Phase.IN_PROGRESS_GROW; - break; - case Phase.IN_PROGRESS_GROW: - nextState.phase = Phase.REST; - break; - } - return nextState; - } - - private void WriteHybridLogMetaInfo() + internal void WriteHybridLogMetaInfo() { checkpointManager.CommitLogCheckpoint(_hybridLogCheckpointToken, _hybridLogCheckpoint.info.ToByteArray()); } - private void WriteIndexMetaInfo() + internal void WriteIndexMetaInfo() { checkpointManager.CommitIndexCheckpoint(_indexCheckpointToken, _indexCheckpoint.info.ToByteArray()); } - private bool ObtainCurrentTailAddress(ref long location) + internal bool ObtainCurrentTailAddress(ref long location) { var tailAddress = hlog.GetTailAddress(); return Interlocked.CompareExchange(ref location, tailAddress, 0) == 0; } - private void InitializeIndexCheckpoint(Guid indexToken) + internal void InitializeIndexCheckpoint(Guid indexToken) { _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, checkpointManager); } - private void InitializeHybridLogCheckpoint(Guid hybridLogToken, int version) + internal void InitializeHybridLogCheckpoint(Guid hybridLogToken, int version) { _hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager); } - #endregion + // #endregion } -} +} \ No newline at end of file diff --git a/cs/src/core/Index/Synchronization/FasterStateMachine.cs b/cs/src/core/Index/Synchronization/FasterStateMachine.cs new file mode 100644 index 000000000..bc4f3fbd0 --- /dev/null +++ b/cs/src/core/Index/Synchronization/FasterStateMachine.cs @@ -0,0 +1,163 @@ +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + public partial class FasterKV + where Key : new() + where Value : new() + where Functions : IFunctions + { + // The current system state, defined as the combination of a phase and a version number. This value + // is observed by all sessions and a state machine communicates its progress to sessions through + // this value + private SystemState _systemState; + // This flag ensures that only one state machine is active at a given time. + private volatile int stateMachineActive = 0; + // The current state machine in the system. The value could be stale and point to the previous state machine + // if no state machine is active at this time. + private ISynchronizationStateMachine currentSyncStateMachine; + + /// + /// Attempt to start the given state machine in the system if no other state machine is active. + /// + /// The state machine to start + /// true if the state machine has started, false otherwise + private bool StartStateMachine(ISynchronizationStateMachine stateMachine) + { + // return immediately if there is a state machine under way. + if (Interlocked.CompareExchange(ref stateMachineActive, 1, 0) != 0) return false; + + currentSyncStateMachine = stateMachine; + // No latch required because the taskMutex guards against other tasks starting, and only a new task + // is allowed to change faster global state from REST + GlobalStateMachineStep(_systemState); + return true; + } + + // Atomic transition from expectedState -> nextState + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool MakeTransition(SystemState expectedState, SystemState nextState) + { + if (Interlocked.CompareExchange(ref _systemState.word, nextState.word, expectedState.word) != + expectedState.word) return false; + Debug.WriteLine("Moved to {0}, {1}", nextState.phase, nextState.version); + return true; + } + + /// + /// Steps the global state machine. This will change the current global system state and perform some actions + /// as prescribed by the current state machine. This function has no effect if the current state is not + /// the given expected state. + /// + /// expected current global state + internal void GlobalStateMachineStep(SystemState expectedState) + { + // Between state transition, temporarily block any concurrent execution thread from progressing to prevent + // perceived inconsistencies + Debug.Assert(expectedState.phase != Phase.INTERMEDIATE, "Cannot step from intermediate"); + var intermediate = SystemState.Make(Phase.INTERMEDIATE, expectedState.version); + if (!MakeTransition(expectedState, intermediate)) return; + + var nextState = currentSyncStateMachine.NextState(expectedState); + + // Execute custom task logic + currentSyncStateMachine.GlobalBeforeEnteringState(nextState, this); + var success = MakeTransition(intermediate, nextState); + // Guaranteed to succeed, because other threads will always block while the system is in intermediate. + Debug.Assert(success); + currentSyncStateMachine.GlobalAfterEnteringState(nextState, this); + + // Mark the state machine done as we exit the state machine. + if (nextState.phase == Phase.REST) stateMachineActive = 0; + } + + + // Given the current global state, return the starting point of the state machine cycle + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private SystemState StartOfCurrentCycle(SystemState currentGlobalState) + { + return currentGlobalState.phase <= Phase.REST + ? SystemState.Make(Phase.REST, currentGlobalState.version - 1) + : SystemState.Make(Phase.REST, currentGlobalState.version); + } + + // Given the current thread state and global state, fast forward the thread state to the + // current state machine cycle if needed + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private SystemState FastForwardToCurrentCycle(SystemState currentThreadState, SystemState currentGlobalState) + { + var startState = StartOfCurrentCycle(currentGlobalState); + if (currentThreadState.version < startState.version || + currentThreadState.version == startState.version && currentThreadState.phase < startState.phase) + { + return startState; + } + + return currentThreadState; + } + + // Return the pair of current state machine and global state, guaranteed to be captured atomicaly. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private (ISynchronizationStateMachine, SystemState) CaptureTaskAndTargetState() + { + while (true) + { + var task = currentSyncStateMachine; + var targetState = SystemState.Copy(ref _systemState); + // We have to make sure that we are not looking at a state resulted from a different + // task. It's ok to be behind when the thread steps through the state machine, but not + // ok if we are using the wrong task. + if (targetState.phase != Phase.INTERMEDIATE && currentSyncStateMachine == task) + return ValueTuple.Create(task, targetState); + } + } + + /// + /// Steps the thread's local state machine. Threads catch up to the current global state and performs + /// necessary actions associated with the state as defined by the current state machine + /// + /// null if calling without a context (e.g. waiting on a checkpoint) + /// null if calling without a session (e.g. waiting on a checkpoint) + /// + /// + /// + private async ValueTask ThreadStateMachineStep(FasterExecutionContext ctx, + ClientSession clientSession, + bool async = true, + CancellationToken token = default) + { + if (async) + clientSession?.UnsafeResumeThread(); + + // Target state is the current (non-intermediate state) system state thread needs to catch up to + var (currentTask, targetState) = CaptureTaskAndTargetState(); + + // the current thread state is what the thread remembers, or simply what the current system + // is if we are calling from somewhere other than an execution thread (e.g. waiting on + // a checkpoint to complete on a client app thread) + var threadState = ctx == null ? targetState : SystemState.Make(ctx.phase, ctx.version); + + // If the thread was in the middle of handling some older, unrelated task, fast-forward to the current task + // as the old one is no longer relevant + threadState = FastForwardToCurrentCycle(threadState, targetState); + var previousState = threadState; + do + { + await currentTask.OnThreadEnteringState(threadState, previousState, this, ctx, clientSession, async, + token); + if (ctx != null) + { + ctx.phase = threadState.phase; + ctx.version = threadState.version; + } + + previousState.word = threadState.word; + threadState = currentTask.NextState(threadState); + } while (previousState.word != targetState.word); + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs new file mode 100644 index 000000000..70c0b4a0a --- /dev/null +++ b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs @@ -0,0 +1,122 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// This task contains logic to orchestrate the index and hybrid log checkpoint in parallel + /// + internal class FullCheckpointOrchestrationTask : ISynchronizationTask + { + /// + public void GlobalBeforeEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (next.phase) + { + case Phase.PREP_INDEX_CHECKPOINT: + Debug.Assert(faster._indexCheckpointToken == default && + faster._hybridLogCheckpointToken == default); + var fullCheckpointToken = Guid.NewGuid(); + faster._indexCheckpointToken = fullCheckpointToken; + faster._hybridLogCheckpointToken = fullCheckpointToken; + faster.InitializeIndexCheckpoint(faster._indexCheckpointToken); + faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version); + break; + case Phase.PREPARE: + if (faster.UseReadCache && faster.ReadCache.BeginAddress != faster.ReadCache.TailAddress) + throw new FasterException("Index checkpoint with read cache is not supported"); + faster.TakeIndexFuzzyCheckpoint(); + break; + case Phase.WAIT_FLUSH: + faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress(); + faster.ObtainCurrentTailAddress(ref faster._indexCheckpoint.info.finalLogicalAddress); + break; + case Phase.PERSISTENCE_CALLBACK: + faster.WriteIndexMetaInfo(); + faster._indexCheckpointToken = default; + break; + } + } + + /// + public void GlobalAfterEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + } + + /// + public async ValueTask OnThreadState(SystemState current, + SystemState prev, + FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, bool async = true, + CancellationToken token = default) where Key : new() + where Value : new() + where Functions : IFunctions + { + if (current.phase != Phase.WAIT_INDEX_CHECKPOINT) return; + + if (async && !faster.IsIndexFuzzyCheckpointCompleted()) + { + clientSession?.UnsafeSuspendThread(); + await faster.IsIndexFuzzyCheckpointCompletedAsync(token); + clientSession?.UnsafeResumeThread(); + } + + faster.GlobalStateMachineStep(current); + } + } + + /// + /// The state machine orchestrates a full checkpoint + /// + internal class FullCheckpointStateMachine : HybridLogCheckpointStateMachine + { + /// + /// Construct a new FullCheckpointStateMachine to use the given checkpoint backend (either fold-over or snapshot), + /// drawing boundary at targetVersion. + /// + /// A task that encapsulates the logic to persist the checkpoint + /// upper limit (inclusive) of the version included + public FullCheckpointStateMachine(ISynchronizationTask checkpointBackend, long targetVersion = -1) : base( + targetVersion, new VersionChangeTask(), new FullCheckpointOrchestrationTask(), checkpointBackend, + new IndexSnapshotTask()) {} + + /// + public override SystemState NextState(SystemState start) + { + var result = SystemState.Copy(ref start); + switch (start.phase) + { + case Phase.REST: + result.phase = Phase.PREP_INDEX_CHECKPOINT; + break; + case Phase.PREP_INDEX_CHECKPOINT: + result.phase = Phase.PREPARE; + break; + case Phase.WAIT_FLUSH: + result.phase = Phase.WAIT_INDEX_CHECKPOINT; + break; + case Phase.WAIT_INDEX_CHECKPOINT: + result.phase = Phase.PERSISTENCE_CALLBACK; + break; + default: + result = base.NextState(start); + break; + } + + return result; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs new file mode 100644 index 000000000..c3af4be3c --- /dev/null +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -0,0 +1,311 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// This task is the base class for a checkpoint "backend", which decides how a captured version is + /// persisted on disk. + /// + internal abstract class HybridLogCheckpointOrchestrationTask : ISynchronizationTask + { + /// + public virtual void GlobalBeforeEnteringState(SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (next.phase) + { + case Phase.PREPARE: + if (faster._hybridLogCheckpointToken == default) + { + faster._hybridLogCheckpointToken = Guid.NewGuid(); + faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version); + } + + faster.ObtainCurrentTailAddress(ref faster._hybridLogCheckpoint.info.startLogicalAddress); + break; + case Phase.WAIT_FLUSH: + faster._hybridLogCheckpoint.info.headAddress = faster.hlog.HeadAddress; + faster._hybridLogCheckpoint.info.beginAddress = faster.hlog.BeginAddress; + break; + case Phase.PERSISTENCE_CALLBACK: + // Collect object log offsets only after flushes + // are completed + var seg = faster.hlog.GetSegmentOffsets(); + if (seg != null) + { + faster._hybridLogCheckpoint.info.objectLogSegmentOffsets = new long[seg.Length]; + Array.Copy(seg, faster._hybridLogCheckpoint.info.objectLogSegmentOffsets, seg.Length); + } + + if (faster._activeSessions != null) + { + // write dormant sessions to checkpoint + foreach (var kvp in faster._activeSessions) + { + faster.AtomicSwitch(kvp.Value.ctx, kvp.Value.ctx.prevCtx, next.version - 1); + } + } + + faster.WriteHybridLogMetaInfo(); + break; + case Phase.REST: + faster._hybridLogCheckpointToken = default; + faster._hybridLogCheckpoint.Reset(); + var nextTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + faster.checkpointTcs.SetResult(new LinkedCheckpointInfo { NextTask = nextTcs.Task }); + faster.checkpointTcs = nextTcs; + break; + } + } + + /// + public virtual void GlobalAfterEnteringState(SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + } + + /// + public virtual ValueTask OnThreadState( + SystemState current, + SystemState prev, FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, bool async = true, + CancellationToken token = default) + where Key : new() + where Value : new() + where Functions : IFunctions + { + if (current.phase != Phase.PERSISTENCE_CALLBACK) return default; + + if (ctx != null) + { + if (!ctx.prevCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback]) + { + if (ctx.prevCtx.serialNum != -1) + { + var commitPoint = new CommitPoint + { + UntilSerialNo = ctx.prevCtx.serialNum, + ExcludedSerialNos = ctx.prevCtx.excludedSerialNos + }; + + // Thread local action + faster.functions.CheckpointCompletionCallback(ctx.guid, commitPoint); + if (clientSession != null) clientSession.LatestCommitPoint = commitPoint; + } + + ctx.prevCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true; + } + + faster.epoch.Mark(EpochPhaseIdx.CheckpointCompletionCallback, current.version); + } + + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, current.version)) + faster.GlobalStateMachineStep(current); + return default; + } + } + + /// + /// A FoldOver checkpoint persists a version by setting the read-only marker past the last entry of that + /// version on the log and waiting until it is flushed to disk. It is simple and fast, but can result + /// in garbage entries on the log, and a slower recovery of performance. + /// + internal class FoldOverCheckpointTask : HybridLogCheckpointOrchestrationTask + { + /// + public override void GlobalBeforeEnteringState(SystemState next, + FasterKV faster) + { + base.GlobalBeforeEnteringState(next, faster); + if (next.phase != Phase.WAIT_FLUSH) return; + + faster.hlog.ShiftReadOnlyToTail(out var tailAddress, + out faster._hybridLogCheckpoint.flushedSemaphore); + faster._hybridLogCheckpoint.info.finalLogicalAddress = tailAddress; + } + + /// + public override async ValueTask OnThreadState( + SystemState current, + SystemState prev, FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, bool async = true, + CancellationToken token = default) + { + await base.OnThreadState(current, prev, faster, ctx, clientSession, async, token); + if (current.phase != Phase.WAIT_FLUSH) return; + + if (ctx == null || !ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush]) + { + var notify = faster.hlog.FlushedUntilAddress >= + faster._hybridLogCheckpoint.info.finalLogicalAddress; + + if (async && !notify) + { + Debug.Assert(faster._hybridLogCheckpoint.flushedSemaphore != null); + clientSession?.UnsafeSuspendThread(); + await faster._hybridLogCheckpoint.flushedSemaphore.WaitAsync(token); + clientSession?.UnsafeResumeThread(); + faster._hybridLogCheckpoint.flushedSemaphore.Release(); + notify = true; + } + + if (!notify) return; + + if (ctx != null) + ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush] = true; + } + + if (ctx != null) + faster.epoch.Mark(EpochPhaseIdx.WaitFlush, current.version); + + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.WaitFlush, current.version)) + faster.GlobalStateMachineStep(current); + } + } + + /// + /// A Snapshot persists a version by making a copy for every entry of that version separate from the log. It is + /// slower and more complex than a foldover, but more space-efficient on the log, and retains in-place + /// update performance as it does not advance the readonly marker unnecessarily. + /// + internal class SnapshotCheckpointTask : HybridLogCheckpointOrchestrationTask + { + /// + public override void GlobalBeforeEnteringState(SystemState next, + FasterKV faster) + { + base.GlobalBeforeEnteringState(next, faster); + switch (next.phase) + { + case Phase.PREPARE: + faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress; + faster._hybridLogCheckpoint.info.useSnapshotFile = 1; + break; + case Phase.WAIT_FLUSH: + faster.ObtainCurrentTailAddress(ref faster._hybridLogCheckpoint.info.finalLogicalAddress); + + faster._hybridLogCheckpoint.snapshotFileDevice = + faster.checkpointManager.GetSnapshotLogDevice(faster._hybridLogCheckpointToken); + faster._hybridLogCheckpoint.snapshotFileObjectLogDevice = + faster.checkpointManager.GetSnapshotObjectLogDevice(faster._hybridLogCheckpointToken); + faster._hybridLogCheckpoint.snapshotFileDevice.Initialize(faster.hlog.GetSegmentSize()); + faster._hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(-1); + + long startPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.flushedLogicalAddress); + long endPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.finalLogicalAddress); + if (faster._hybridLogCheckpoint.info.finalLogicalAddress > + faster.hlog.GetStartLogicalAddress(endPage)) + { + endPage++; + } + + // This can be run on a new thread if we want to immediately parallelize + // the rest of the log flush + faster.hlog.AsyncFlushPagesToDevice( + startPage, + endPage, + faster._hybridLogCheckpoint.info.finalLogicalAddress, + faster._hybridLogCheckpoint.snapshotFileDevice, + faster._hybridLogCheckpoint.snapshotFileObjectLogDevice, + out faster._hybridLogCheckpoint.flushedSemaphore); + break; + } + } + + /// + public override async ValueTask OnThreadState( + SystemState current, + SystemState prev, FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, bool async = true, + CancellationToken token = default) + { + await base.OnThreadState(current, prev, faster, ctx, clientSession, async, token); + if (current.phase != Phase.WAIT_FLUSH) return; + + if (ctx == null || !ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush]) + { + var notify = faster._hybridLogCheckpoint.flushedSemaphore != null && + faster._hybridLogCheckpoint.flushedSemaphore.CurrentCount > 0; + + if (async && !notify) + { + Debug.Assert(faster._hybridLogCheckpoint.flushedSemaphore != null); + clientSession?.UnsafeSuspendThread(); + await faster._hybridLogCheckpoint.flushedSemaphore.WaitAsync(token); + clientSession?.UnsafeResumeThread(); + faster._hybridLogCheckpoint.flushedSemaphore.Release(); + notify = true; + } + + if (!notify) return; + + if (ctx != null) + ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush] = true; + } + + if (ctx != null) + faster.epoch.Mark(EpochPhaseIdx.WaitFlush, current.version); + + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.WaitFlush, current.version)) + faster.GlobalStateMachineStep(current); + } + } + + /// + /// + /// + internal class HybridLogCheckpointStateMachine : VersionChangeStateMachine + { + /// + /// Construct a new HybridLogCheckpointStateMachine to use the given checkpoint backend (either fold-over or + /// snapshot), drawing boundary at targetVersion. + /// + /// A task that encapsulates the logic to persist the checkpoint + /// upper limit (inclusive) of the version included + public HybridLogCheckpointStateMachine(ISynchronizationTask checkpointBackend, long targetVersion = -1) + : base(targetVersion, new VersionChangeTask(), checkpointBackend) {} + + /// + /// Construct a new HybridLogCheckpointStateMachine with the given tasks. Does not load any tasks by default. + /// + /// upper limit (inclusive) of the version included + /// The tasks to load onto the state machine + protected HybridLogCheckpointStateMachine(long targetVersion, params ISynchronizationTask[] tasks) + : base(targetVersion, tasks) {} + + /// + public override SystemState NextState(SystemState start) + { + var result = SystemState.Copy(ref start); + switch (start.phase) + { + case Phase.WAIT_PENDING: + result.phase = Phase.WAIT_FLUSH; + break; + case Phase.WAIT_FLUSH: + result.phase = Phase.PERSISTENCE_CALLBACK; + break; + case Phase.PERSISTENCE_CALLBACK: + result.phase = Phase.REST; + break; + default: + result = base.NextState(start); + break; + } + + return result; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Synchronization/ISynchronizationStateMachine.cs b/cs/src/core/Index/Synchronization/ISynchronizationStateMachine.cs new file mode 100644 index 000000000..866f1456a --- /dev/null +++ b/cs/src/core/Index/Synchronization/ISynchronizationStateMachine.cs @@ -0,0 +1,220 @@ +using System.Globalization; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// A state machine defines a serious of actions that changes the system, which requires all sessions to + /// synchronize and agree on certain time points. A full run of the state machine is defined as a cycle + /// starting from REST and ending in REST, and only one state machine can be active at a given time. + /// + internal interface ISynchronizationStateMachine + { + /// + /// This function models the transition function of a state machine. + /// + /// The current state of the state machine + /// the next state in this state machine + SystemState NextState(SystemState start); + + /// + /// This function is invoked immediately before the global state machine enters the given state. + /// + /// + /// + /// + /// + /// + /// + /// + /// + void GlobalBeforeEnteringState(SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions; + + /// + /// This function is invoked immediately after the global state machine enters the given state. + /// + /// + /// + /// + /// + /// + /// + /// + /// + void GlobalAfterEnteringState(SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions; + + /// + /// This function is invoked for every thread when they refresh and observe a given state. + /// + /// Note that the function is not allowed to await when async is set to false. + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + ValueTask OnThreadEnteringState(SystemState current, + SystemState prev, + FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, + bool async = true, + CancellationToken token = default) + where Key : new() + where Value : new() + where Functions : IFunctions; + } + + + /// + /// An ISynchronizationTask specifies logic to be run on a state machine, but does not specify a transition + /// function. It is therefore possible to write common logic in an ISynchronizationTask and reuse it across + /// multiple state machines, or to choose the task at runtime and achieve polymorphism in the behavior + /// of a concrete state machine class. + /// + internal interface ISynchronizationTask + { + /// + /// This function is invoked immediately before the global state machine enters the given state. + /// + /// + /// + /// + /// + /// + /// + /// + /// + void GlobalBeforeEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions; + + /// + /// This function is invoked immediately after the global state machine enters the given state. + /// + /// + /// + /// + /// + /// + /// + /// + /// + void GlobalAfterEnteringState( + SystemState next, + FasterKV faster) + where Key : new () + where Value : new () + where Functions : IFunctions; + + /// + /// This function is invoked for every thread when they refresh and observe a given state. + /// + /// Note that the function is not allowed to await when async is set to false. + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + ValueTask OnThreadState( + SystemState current, + SystemState prev, + FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, + bool async = true, + CancellationToken token = default) + where Key : new() + where Value : new() + where Functions : IFunctions; + } + + + /// + /// Abstract base class for ISynchronizationStateMachine that implements that state machine logic + /// with ISynchronizationTasks + /// + internal abstract class SynchronizationStateMachineBase : ISynchronizationStateMachine + { + private ISynchronizationTask[] tasks; + + /// + /// Construct a new SynchronizationStateMachine with the given tasks. The order of tasks given is the + /// order they are executed on each state machine. + /// + /// The ISynchronizationTasks to run on the state machine + protected SynchronizationStateMachineBase(params ISynchronizationTask[] tasks) + { + this.tasks = tasks; + } + + /// + public abstract SystemState NextState(SystemState start); + + /// + public void GlobalBeforeEnteringState(SystemState next, + FasterKV faster) where Key : new() + where Value : new() + where Functions : IFunctions + { + foreach (var task in tasks) + task.GlobalBeforeEnteringState(next, faster); + } + + /// + public void GlobalAfterEnteringState(SystemState next, + FasterKV faster) where Key : new() + where Value : new() + where Functions : IFunctions + { + foreach (var task in tasks) + task.GlobalAfterEnteringState(next, faster); + } + + /// + public async ValueTask OnThreadEnteringState( + SystemState current, + SystemState prev, + FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, bool async = true, + CancellationToken token = default) where Key : new() + where Value : new() + where Functions : IFunctions + { + foreach (var task in tasks) + await task.OnThreadState(current, prev, faster, ctx, clientSession, async, token); + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs b/cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs new file mode 100644 index 000000000..28cc42bdd --- /dev/null +++ b/cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs @@ -0,0 +1,125 @@ +using System.ComponentModel; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// Resizes an index + /// + internal class IndexResizeTask : ISynchronizationTask + { + /// + public void GlobalBeforeEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (next.phase) + { + case Phase.PREPARE_GROW: + // nothing to do + break; + case Phase.IN_PROGRESS_GROW: + // Set up the transition to new version of HT + var numChunks = (int) (faster.state[faster.resizeInfo.version].size / Constants.kSizeofChunk); + if (numChunks == 0) numChunks = 1; // at least one chunk + + faster.numPendingChunksToBeSplit = numChunks; + faster.splitStatus = new long[numChunks]; + + faster.Initialize(1 - faster.resizeInfo.version, faster.state[faster.resizeInfo.version].size * 2, + faster.sectorSize); + + faster.resizeInfo.version = 1 - faster.resizeInfo.version; + break; + case Phase.REST: + // nothing to do + break; + default: + throw new FasterException("Invalid Enum Argument"); + } + } + + /// + public void GlobalAfterEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (next.phase) + { + case Phase.PREPARE_GROW: + faster.epoch.BumpCurrentEpoch(() => faster.GlobalStateMachineStep(next)); + break; + case Phase.IN_PROGRESS_GROW: + case Phase.REST: + // nothing to do + break; + default: + throw new FasterException("Invalid Enum Argument"); + } + } + + /// + public ValueTask OnThreadState( + SystemState current, + SystemState prev, + FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, + bool async = true, + CancellationToken token = default) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (current.phase) + { + case Phase.PREPARE_GROW: + case Phase.IN_PROGRESS_GROW: + case Phase.REST: + return default; + default: + throw new FasterException("Invalid Enum Argument"); + } + } + } + + /// + /// Resizes the index + /// + internal class IndexResizeStateMachine : SynchronizationStateMachineBase + { + /// + /// Constructs a new IndexResizeStateMachine + /// + public IndexResizeStateMachine() : base(new IndexResizeTask()) {} + + /// + public override SystemState NextState(SystemState start) + { + var nextState = SystemState.Copy(ref start); + switch (start.phase) + { + case Phase.REST: + nextState.phase = Phase.PREPARE_GROW; + break; + case Phase.PREPARE_GROW: + nextState.phase = Phase.IN_PROGRESS_GROW; + break; + case Phase.IN_PROGRESS_GROW: + nextState.phase = Phase.REST; + break; + default: + throw new FasterException("Invalid Enum Argument"); + } + + return nextState; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs b/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs new file mode 100644 index 000000000..734d0ffba --- /dev/null +++ b/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs @@ -0,0 +1,144 @@ +using System; +using System.ComponentModel; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// This task performs an index checkpoint. + /// + internal class IndexSnapshotTask : ISynchronizationTask + { + /// + public void GlobalBeforeEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (next.phase) + { + case Phase.PREP_INDEX_CHECKPOINT: + if (faster._indexCheckpointToken == default) + { + faster._indexCheckpointToken = Guid.NewGuid(); + faster.InitializeIndexCheckpoint(faster._indexCheckpointToken); + } + + faster.ObtainCurrentTailAddress(ref faster._indexCheckpoint.info.startLogicalAddress); + break; + case Phase.INDEX_CHECKPOINT: + if (faster.UseReadCache && faster.ReadCache.BeginAddress != faster.ReadCache.TailAddress) + throw new FasterException("Index checkpoint with read cache is not supported"); + faster.TakeIndexFuzzyCheckpoint(); + break; + + case Phase.REST: + // If the tail address has already been obtained, because another task on the state machine + // has done so earlier (e.g. FullCheckpoint captures log tail at WAIT_FLUSH), don't update + // the tail address. + if (faster.ObtainCurrentTailAddress(ref faster._indexCheckpoint.info.finalLogicalAddress)) + faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress(); + if (faster._indexCheckpointToken != default) + { + faster.WriteIndexMetaInfo(); + faster._indexCheckpointToken = default; + } + faster._indexCheckpoint.Reset(); + + break; + } + } + + /// + public void GlobalAfterEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + } + + /// + public async ValueTask OnThreadState( + SystemState current, + SystemState prev, FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, bool async = true, + CancellationToken token = default) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (current.phase) + { + case Phase.PREP_INDEX_CHECKPOINT: + if (ctx != null) + { + if (!ctx.markers[EpochPhaseIdx.PrepareForIndexCheckpt]) + ctx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = true; + faster.epoch.Mark(EpochPhaseIdx.PrepareForIndexCheckpt, current.version); + } + + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, current.version)) + faster.GlobalStateMachineStep(current); + break; + case Phase.INDEX_CHECKPOINT: + if (ctx != null) + { + // Resetting the marker for a potential FULL or INDEX_ONLY checkpoint in the future + ctx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = false; + } + + if (async && !faster.IsIndexFuzzyCheckpointCompleted()) + { + clientSession?.UnsafeSuspendThread(); + await faster.IsIndexFuzzyCheckpointCompletedAsync(token); + clientSession?.UnsafeResumeThread(); + } + + faster.GlobalStateMachineStep(current); + break; + } + } + } + + /// + /// This state machine performs an index checkpoint + /// + internal class IndexSnapshotStateMachine : SynchronizationStateMachineBase + { + /// + /// Create a new IndexSnapshotStateMachine + /// + public IndexSnapshotStateMachine() : base(new IndexSnapshotTask()) + { + } + + /// + public override SystemState NextState(SystemState start) + { + var result = SystemState.Copy(ref start); + switch (start.phase) + { + case Phase.REST: + result.phase = Phase.PREP_INDEX_CHECKPOINT; + break; + case Phase.PREP_INDEX_CHECKPOINT: + result.phase = Phase.INDEX_CHECKPOINT; + break; + case Phase.INDEX_CHECKPOINT: + result.phase = Phase.REST; + result.version++; + break; + default: + throw new FasterException("Invalid Enum Argument"); + } + + return result; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Utilities/StateTransitions.cs b/cs/src/core/Index/Synchronization/StateTransitions.cs similarity index 81% rename from cs/src/core/Utilities/StateTransitions.cs rename to cs/src/core/Index/Synchronization/StateTransitions.cs index 990e12beb..806e8bc1c 100644 --- a/cs/src/core/Utilities/StateTransitions.cs +++ b/cs/src/core/Index/Synchronization/StateTransitions.cs @@ -16,7 +16,7 @@ namespace FASTER.core internal enum ResizeOperationStatus : int { IN_PROGRESS, DONE }; [StructLayout(LayoutKind.Explicit, Size = 8)] - internal unsafe struct ResizeInfo + internal struct ResizeInfo { [FieldOffset(0)] public ResizeOperationStatus status; @@ -29,21 +29,29 @@ internal unsafe struct ResizeInfo } internal enum Phase : int { - IN_PROGRESS, WAIT_PENDING, WAIT_FLUSH, PERSISTENCE_CALLBACK, REST, - PREP_INDEX_CHECKPOINT, INDEX_CHECKPOINT, PREPARE, - PREPARE_GROW, IN_PROGRESS_GROW, + IN_PROGRESS, + WAIT_PENDING, + WAIT_FLUSH, + PERSISTENCE_CALLBACK, + WAIT_INDEX_CHECKPOINT, + REST, + PREP_INDEX_CHECKPOINT, + INDEX_CHECKPOINT, + PREPARE, + PREPARE_GROW, + IN_PROGRESS_GROW, INTERMEDIATE, }; [StructLayout(LayoutKind.Explicit, Size = 8)] - internal unsafe struct SystemState + internal struct SystemState { [FieldOffset(0)] public Phase phase; [FieldOffset(4)] public int version; - + [FieldOffset(0)] public long word; @@ -63,7 +71,5 @@ public static SystemState Make(Phase status, int version) info.version = version; return info; } - } - } diff --git a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs new file mode 100644 index 000000000..a5ea837a4 --- /dev/null +++ b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs @@ -0,0 +1,207 @@ +using System; +using System.ComponentModel; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// A Version change captures a version on the log by forcing all threads to coordinate a move to the next + /// version. It is used as the basis of many other tasks, which decides what they do with the captured + /// version. + /// + internal class VersionChangeTask : ISynchronizationTask + { + /// + public void GlobalBeforeEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + } + + /// + public void GlobalAfterEnteringState( + SystemState start, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + } + + /// + public ValueTask OnThreadState( + SystemState current, SystemState prev, + FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, + bool async = true, + CancellationToken token = default) + where Key : new() + where Value : new() + where Functions : IFunctions + { + switch (current.phase) + { + case Phase.PREPARE: + if (ctx != null) + { + if (!ctx.markers[EpochPhaseIdx.Prepare]) + { + if (!faster.RelaxedCPR) + faster.AcquireSharedLatchesForAllPendingRequests(ctx); + ctx.markers[EpochPhaseIdx.Prepare] = true; + } + + faster.epoch.Mark(EpochPhaseIdx.Prepare, current.version); + } + + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.Prepare, current.version)) + faster.GlobalStateMachineStep(current); + break; + case Phase.IN_PROGRESS: + if (ctx != null) + { + // Need to be very careful here as threadCtx is changing + var _ctx = prev.phase == Phase.IN_PROGRESS ? ctx.prevCtx : ctx; + + if (!_ctx.markers[EpochPhaseIdx.InProgress]) + { + faster.AtomicSwitch(ctx, ctx.prevCtx, _ctx.version); + faster.InitContext(ctx, ctx.prevCtx.guid, ctx.prevCtx.serialNum); + + // Has to be prevCtx, not ctx + ctx.prevCtx.markers[EpochPhaseIdx.InProgress] = true; + } + + faster.epoch.Mark(EpochPhaseIdx.InProgress, current.version); + } + + // Has to be prevCtx, not ctx + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.InProgress, current.version)) + faster.GlobalStateMachineStep(current); + break; + case Phase.WAIT_PENDING: + if (ctx != null) + { + if (!ctx.prevCtx.markers[EpochPhaseIdx.WaitPending]) + { + if (ctx.prevCtx.HasNoPendingRequests) + ctx.prevCtx.markers[EpochPhaseIdx.WaitPending] = true; + else + break; + } + + faster.epoch.Mark(EpochPhaseIdx.WaitPending, current.version); + } + + if (faster.epoch.CheckIsComplete(EpochPhaseIdx.WaitPending, current.version)) + faster.GlobalStateMachineStep(current); + break; + case Phase.REST: + break; + } + + return default; + } + } + + /// + /// The FoldOver task simply sets the read only offset to the current end of the log, so a captured version + /// is immutable and will eventually be flushed to disk. + /// + internal class FoldOverTask : ISynchronizationTask + { + /// + public void GlobalBeforeEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions + { + if (next.phase == Phase.REST) + // Before leaving the checkpoint, make sure all previous versions are read-only. + faster.hlog.ShiftReadOnlyToTail(out _, out _); + } + + /// + public void GlobalAfterEnteringState( + SystemState next, + FasterKV faster) + where Key : new() + where Value : new() + where Functions : IFunctions { } + + /// + public ValueTask OnThreadState( + SystemState current, + SystemState prev, + FasterKV faster, + FasterKV.FasterExecutionContext ctx, + ClientSession clientSession, bool async = true, + CancellationToken token = default) + where Key : new() + where Value : new() + where Functions : IFunctions + { + return default; + } + } + + /// + /// A VersionChangeStateMachine orchestrates to capture a version, but does not flush to disk. + /// + internal class VersionChangeStateMachine : SynchronizationStateMachineBase + { + private readonly long targetVersion; + + /// + /// Construct a new VersionChangeStateMachine with the given tasks. Does not load any tasks by default. + /// + /// upper limit (inclusive) of the version included + /// The tasks to load onto the state machine + protected VersionChangeStateMachine(long targetVersion = -1, params ISynchronizationTask[] tasks) : base(tasks) + { + this.targetVersion = targetVersion; + } + + /// + /// Construct a new VersionChangeStateMachine that folds over the log at the end without waiting for flush. + /// + /// upper limit (inclusive) of the version included + public VersionChangeStateMachine(long targetVersion = -1) : this(targetVersion, new VersionChangeTask(), new FoldOverTask()) { } + + /// + public override SystemState NextState(SystemState start) + { + var nextState = SystemState.Copy(ref start); + switch (start.phase) + { + case Phase.REST: + nextState.phase = Phase.PREPARE; + break; + case Phase.PREPARE: + nextState.phase = Phase.IN_PROGRESS; + // TODO: Move to long for system state as well. + nextState.version = (int) (targetVersion == -1 ? start.version + 1 : targetVersion + 1); + break; + case Phase.IN_PROGRESS: + // This phase has no effect if using relaxed CPR model + nextState.phase = Phase.WAIT_PENDING; + break; + case Phase.WAIT_PENDING: + nextState.phase = Phase.REST; + break; + default: + throw new FasterException("Invalid Enum Argument"); + } + + return nextState; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Utilities/AsyncCountDown.cs b/cs/src/core/Utilities/AsyncCountDown.cs index b234a807d..44358f75f 100644 --- a/cs/src/core/Utilities/AsyncCountDown.cs +++ b/cs/src/core/Utilities/AsyncCountDown.cs @@ -15,7 +15,7 @@ internal sealed class AsyncCountDown int counter; TaskCompletionSource tcs; TaskCompletionSource nextTcs; - + public AsyncCountDown() { nextTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -45,7 +45,7 @@ private void TryCompleteAwaitingTask() Volatile.Read(ref tcs)?.TrySetResult(0); // Reset TCS, so next awaiters produce a new one - Interlocked.Exchange(ref tcs, null); + Interlocked.Exchange(ref tcs, null); } /// @@ -58,17 +58,15 @@ public Task WaitEmptyAsync() return Task.CompletedTask; return GetOrCreateTaskCompletionSource(); - } [MethodImpl(MethodImplOptions.AggressiveInlining)] private Task GetOrCreateTaskCompletionSource() { - // If tcs is not null, we'll get it in taskSource var taskSource = Interlocked.CompareExchange(ref tcs, nextTcs, null); - if (taskSource == null) + if (taskSource == null) { // Tcs was null and nextTcs got assigned to it. taskSource = nextTcs; @@ -83,4 +81,4 @@ private Task GetOrCreateTaskCompletionSource() return taskSource.Task; } } -} +} \ No newline at end of file diff --git a/cs/test/FullRecoveryTests.cs b/cs/test/RecoveryTests.cs similarity index 67% rename from cs/test/FullRecoveryTests.cs rename to cs/test/RecoveryTests.cs index ba744188d..b720f8b5a 100644 --- a/cs/test/FullRecoveryTests.cs +++ b/cs/test/RecoveryTests.cs @@ -10,18 +10,18 @@ namespace FASTER.test.recovery.sumstore { - [TestFixture] - internal class FullRecoveryTests + internal class RecoveryTests { const long numUniqueKeys = (1 << 14); const long keySpace = (1L << 14); const long numOps = (1L << 19); const long completePendingInterval = (1L << 10); const long checkpointInterval = (1L << 16); + private FasterKV fht; private string test_path; - private Guid token; + private List logTokens, indexTokens; private IDevice log; [SetUp] @@ -37,10 +37,10 @@ public void Setup() log = Devices.CreateLogDevice(test_path + "\\FullRecoveryTests.log"); fht = new FasterKV - (keySpace, new Functions(), - new LogSettings { LogDevice = log }, - new CheckpointSettings { CheckpointDir = test_path, CheckPointType = CheckpointType.Snapshot } - ); + (keySpace, new Functions(), + new LogSettings {LogDevice = log}, + new CheckpointSettings {CheckpointDir = test_path, CheckPointType = CheckpointType.Snapshot} + ); } [TearDown] @@ -74,18 +74,83 @@ public static void DeleteDirectory(string path) } [Test] - public void RecoveryTest1() + public void RecoveryTestSeparateCheckpoint() { - Populate(); - fht.Dispose(); - fht = null; - log.Close(); - Setup(); - RecoverAndTest(token, token); + Populate(SeparateCheckpointAction); + + for (var i = 0; i < logTokens.Count; i++) + { + if (i >= indexTokens.Count) break; + fht.Dispose(); + fht = null; + log.Close(); + Setup(); + RecoverAndTest(logTokens[i], indexTokens[i]); + } + } + + [Test] + public void RecoveryTestFullCheckpoint() + { + Populate(FullCheckpointAction); + + foreach (var token in logTokens) + { + fht.Dispose(); + fht = null; + log.Close(); + Setup(); + RecoverAndTest(token, token); + } + } + + public void FullCheckpointAction(int opNum) + { + if ((opNum + 1) % checkpointInterval == 0) + { + Guid token; + while (!fht.TakeFullCheckpoint(out token)) + { + } + + logTokens.Add(token); + indexTokens.Add(token); + + fht.CompleteCheckpointAsync().GetAwaiter().GetResult(); + } } - public void Populate() + public void SeparateCheckpointAction(int opNum) { + if ((opNum + 1) % checkpointInterval != 0) return; + + var checkpointNum = (opNum + 1) / checkpointInterval; + if (checkpointNum % 2 == 1) + { + Guid token; + while (!fht.TakeHybridLogCheckpoint(out token)) + { + } + + logTokens.Add(token); + fht.CompleteCheckpointAsync().GetAwaiter().GetResult(); + } + else + { + Guid token; + while (!fht.TakeIndexCheckpoint(out token)) + { + } + + indexTokens.Add(token); + fht.CompleteCheckpointAsync().GetAwaiter().GetResult(); + } + } + + public void Populate(Action checkpointAction) + { + logTokens = new List(); + indexTokens = new List(); // Prepare the dataset var inputArray = new AdInput[numOps]; for (int i = 0; i < numOps; i++) @@ -103,17 +168,7 @@ public void Populate() { session.RMW(ref inputArray[i].adId, ref inputArray[i], Empty.Default, i); - if ((i+1) % checkpointInterval == 0) - { - if (first) - while (!fht.TakeFullCheckpoint(out token)) ; - else - while (!fht.TakeFullCheckpoint(out Guid nextToken)) ; - - fht.CompleteCheckpointAsync().GetAwaiter().GetResult(); - - first = false; - } + checkpointAction(i); if (i % completePendingInterval == 0) { @@ -131,7 +186,7 @@ public void Populate() public void RecoverAndTest(Guid cprVersion, Guid indexVersion) { // Recover - fht.Recover(cprVersion, indexVersion); + fht.Recover(indexVersion, cprVersion); // Create array for reading var inputArray = new AdInput[numUniqueKeys]; @@ -194,8 +249,9 @@ public void RecoverAndTest(Guid cprVersion, Guid indexVersion) { Assert.IsTrue( expected[i] == inputArray[i].numClicks.numClicks, - "Debug error for AdId {0}: Expected ({1}), Found({2})", inputArray[i].adId.adId, expected[i], inputArray[i].numClicks.numClicks); + "Debug error for AdId {0}: Expected ({1}), Found({2})", inputArray[i].adId.adId, expected[i], + inputArray[i].numClicks.numClicks); } } } -} +} \ No newline at end of file