diff --git a/cs/src/core/ClientSession/FASTERAsync.cs b/cs/src/core/ClientSession/FASTERAsync.cs index 7f38f032f..36d872d40 100644 --- a/cs/src/core/ClientSession/FASTERAsync.cs +++ b/cs/src/core/ClientSession/FASTERAsync.cs @@ -17,6 +17,12 @@ public partial class FasterKV : F where Value : new() where Functions : IFunctions { + /// + /// Whether we are in relaxed CPR mode, where IO pending ops are not + /// part of the CPR checkpoint + /// + public bool RelaxedCPR = false; + /// /// Complete outstanding pending operations /// @@ -25,34 +31,42 @@ internal async ValueTask CompletePendingAsync(ClientSession 0) + { + clientSession.ResumeThread(); + CompleteRetryRequests(clientSession.prevCtx, clientSession); + clientSession.SuspendThread(); + } - done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0); - done &= (prevThreadCtx.Value.retryRequests.Count == 0); + done &= (clientSession.prevCtx.ioPendingRequests.Count == 0); + done &= (clientSession.prevCtx.retryRequests.Count == 0); + } + #endregion } - #endregion - if (!(threadCtx.Value.phase == Phase.IN_PROGRESS + if (RelaxedCPR || (!(clientSession.ctx.phase == Phase.IN_PROGRESS || - threadCtx.Value.phase == Phase.WAIT_PENDING)) + clientSession.ctx.phase == Phase.WAIT_PENDING))) { - await CompleteIOPendingRequestsAsync(threadCtx.Value, clientSession); - Debug.Assert(threadCtx.Value.ioPendingRequests.Count == 0); + await CompleteIOPendingRequestsAsync(clientSession.ctx, clientSession); + Debug.Assert(clientSession.ctx.ioPendingRequests.Count == 0); } - CompleteRetryRequests(threadCtx.Value); + CompleteRetryRequests(clientSession.ctx, clientSession); - done &= (threadCtx.Value.ioPendingRequests.Count == 0); - done &= (threadCtx.Value.retryRequests.Count == 0); + done &= (clientSession.ctx.ioPendingRequests.Count == 0); + done &= (clientSession.ctx.retryRequests.Count == 0); if (!done) { @@ -89,6 +103,8 @@ internal async ValueTask CompleteCheckpointAsync(ClientSession clientSession) { + clientSession.ResumeThread(); + epoch.ProtectAndDrain(); epoch.Check(); @@ -107,7 +123,7 @@ internal async ValueTask InternalRefreshAsync(ClientSession public ClientSession StartClientSession() { + // We have to use relaxed CPR with async client sessions + this.RelaxedCPR = true; + Guid guid = Guid.NewGuid(); var ctx = new FasterExecutionContext(); InitContext(ctx, guid); @@ -47,6 +50,9 @@ public ClientSession StartClientS /// public ClientSession ContinueClientSession(Guid guid, out long lsn) { + // We have to use relaxed CPR with async client sessions + this.RelaxedCPR = true; + lsn = InternalContinue(guid); if (lsn == -1) throw new Exception($"Unable to find session {guid} to recover"); diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 450897a9d..8d7eac4f9 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -134,11 +134,24 @@ internal void InitContext(FasterExecutionContext ctx, Guid token) ctx.version = _systemState.version; ctx.markers = new bool[8]; ctx.serialNum = 0; - ctx.totalPending = 0; ctx.guid = token; - ctx.retryRequests = new Queue(); - ctx.readyResponses = new AsyncQueue>(); - ctx.ioPendingRequests = new Dictionary(); + + if (RelaxedCPR) + { + if (ctx.retryRequests == null) + { + ctx.retryRequests = new Queue(); + ctx.readyResponses = new AsyncQueue>(); + ctx.ioPendingRequests = new Dictionary(); + } + } + else + { + ctx.totalPending = 0; + ctx.retryRequests = new Queue(); + ctx.readyResponses = new AsyncQueue>(); + ctx.ioPendingRequests = new Dictionary(); + } } internal void CopyContext(FasterExecutionContext src, FasterExecutionContext dst) @@ -147,11 +160,14 @@ internal void CopyContext(FasterExecutionContext src, FasterExecutionContext dst dst.version = src.version; dst.markers = src.markers; dst.serialNum = src.serialNum; - dst.totalPending = src.totalPending; dst.guid = src.guid; - dst.retryRequests = src.retryRequests; - dst.readyResponses = src.readyResponses; - dst.ioPendingRequests = src.ioPendingRequests; + if (!RelaxedCPR) + { + dst.totalPending = src.totalPending; + dst.retryRequests = src.retryRequests; + dst.readyResponses = src.readyResponses; + dst.ioPendingRequests = src.ioPendingRequests; + } } internal bool InternalCompletePending(bool wait = false) @@ -160,19 +176,22 @@ internal bool InternalCompletePending(bool wait = false) { bool done = true; - #region Previous pending requests - if (threadCtx.Value.phase == Phase.IN_PROGRESS - || - threadCtx.Value.phase == Phase.WAIT_PENDING) + if (!RelaxedCPR) { - CompleteIOPendingRequests(prevThreadCtx.Value); - Refresh(); - CompleteRetryRequests(prevThreadCtx.Value); + #region Previous pending requests + if (threadCtx.Value.phase == Phase.IN_PROGRESS + || + threadCtx.Value.phase == Phase.WAIT_PENDING) + { + CompleteIOPendingRequests(prevThreadCtx.Value); + Refresh(); + CompleteRetryRequests(prevThreadCtx.Value); - done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0); - done &= (prevThreadCtx.Value.retryRequests.Count == 0); + done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0); + done &= (prevThreadCtx.Value.retryRequests.Count == 0); + } + #endregion } - #endregion if (!(threadCtx.Value.phase == Phase.IN_PROGRESS || @@ -204,6 +223,9 @@ internal bool InternalCompletePending(bool wait = false) internal void CompleteRetryRequests(FasterExecutionContext context) { int count = context.retryRequests.Count; + + if (count == 0) return; + for (int i = 0; i < count; i++) { var pendingContext = context.retryRequests.Dequeue(); @@ -211,6 +233,21 @@ internal void CompleteRetryRequests(FasterExecutionContext context) } } + internal void CompleteRetryRequests(FasterExecutionContext context, ClientSession clientSession, CancellationToken token = default(CancellationToken)) + { + int count = context.retryRequests.Count; + + if (count == 0) return; + + clientSession.ResumeThread(); + for (int i = 0; i < count; i++) + { + var pendingContext = context.retryRequests.Dequeue(); + InternalRetryRequestAndCallback(context, pendingContext); + } + clientSession.SuspendThread(); + } + internal void CompleteIOPendingRequests(FasterExecutionContext context) { if (context.readyResponses.Count == 0) return; @@ -258,15 +295,20 @@ internal void InternalRetryRequestAndCallback( ref Key key = ref pendingContext.key.Get(); ref Value value = ref pendingContext.value.Get(); - #region Entry latch operation - var handleLatches = false; - if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1) - || - (threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + bool handleLatches = false; + + if (!RelaxedCPR) { - handleLatches = true; + #region Entry latch operation + + if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1) + || + (threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + { + handleLatches = true; + } + #endregion } - #endregion // Issue retry command switch(pendingContext.type) @@ -333,12 +375,16 @@ internal void InternalContinuePendingRequestAndCallback( FasterExecutionContext ctx, AsyncIOContext request) { - var handleLatches = false; - if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1) - || - (threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + bool handleLatches = false; + + if (!RelaxedCPR) { - handleLatches = true; + if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1) + || + (threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + { + handleLatches = true; + } } if (ctx.ioPendingRequests.TryGetValue(request.id, out PendingContext pendingContext)) diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs index 13755a759..e01e50e62 100644 --- a/cs/src/core/Index/Recovery/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -448,7 +448,7 @@ private void AcquireSharedLatchesForAllPendingRequests() * * GC: * REST -> GC -> REST - */ + */ [MethodImpl(MethodImplOptions.AggressiveInlining)] private SystemState GetNextState(SystemState start, CheckpointType type = CheckpointType.FULL) { @@ -496,7 +496,7 @@ private SystemState GetNextState(SystemState start, CheckpointType type = Checkp nextState.version = start.version + 1; break; case Phase.IN_PROGRESS: - nextState.phase = Phase.WAIT_PENDING; + nextState.phase = RelaxedCPR ? Phase.WAIT_FLUSH : Phase.WAIT_PENDING; break; case Phase.WAIT_PENDING: nextState.phase = Phase.WAIT_FLUSH;