diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index f15f90a6c..4e66f5433 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -310,7 +310,8 @@ public void Refresh() } /// - /// Sync complete outstanding pending operations + /// Sync complete all outstanding pending operations + /// Async operations (ReadAsync) must be completed individually /// /// Spin-wait for all pending operations on session to complete /// Extend spin-wait until ongoing commit/checkpoint, if any, completes @@ -365,6 +366,22 @@ public async ValueTask CompletePendingAsync(bool waitForCommit = false, Cancella await WaitForCommitAsync(token); } + /// + /// Check if at least one request is ready for CompletePending to be called on + /// Returns completed immediately if there are no outstanding requests + /// + /// + /// + public async ValueTask ReadyToCompletePendingAsync(CancellationToken token = default) + { + token.ThrowIfCancellationRequested(); + + if (fht.epoch.ThisInstanceProtected()) + throw new NotSupportedException("Async operations not supported over protected epoch"); + + await fht.ReadyToCompletePendingAsync(this, token); + } + /// /// Wait for commit of all operations completed until the current point in session. /// Does not itself issue checkpoint/commits. diff --git a/cs/src/core/ClientSession/FASTERAsync.cs b/cs/src/core/ClientSession/FASTERAsync.cs index abe926918..d351cd05d 100644 --- a/cs/src/core/ClientSession/FASTERAsync.cs +++ b/cs/src/core/ClientSession/FASTERAsync.cs @@ -26,6 +26,30 @@ public partial class FasterKV : F where Value : new() where Functions : IFunctions { + + /// + /// Check if at least one (sync) request is ready for CompletePending to operate on + /// + /// + /// + /// + internal async ValueTask ReadyToCompletePendingAsync(ClientSession clientSession, CancellationToken token = default) + { + #region Previous pending requests + if (!RelaxedCPR) + { + if (clientSession.ctx.phase == Phase.IN_PROGRESS || clientSession.ctx.phase == Phase.WAIT_PENDING) + { + if (clientSession.ctx.prevCtx.SyncIoPendingCount != 0) + await clientSession.ctx.prevCtx.readyResponses.WaitForEntryAsync(token); + } + } + #endregion + + if (clientSession.ctx.SyncIoPendingCount != 0) + await clientSession.ctx.readyResponses.WaitForEntryAsync(token); + } + /// /// Complete outstanding pending operations that were issued synchronously /// Async operations (e.g., ReadAsync) need to be completed individually @@ -46,7 +70,7 @@ internal async ValueTask CompletePendingAsync(ClientSession 0) { @@ -106,7 +130,7 @@ internal ReadAsyncInternal(FasterKV SlowReadAsync( { var diskRequest = @this.ScheduleGetFromDisk(clientSession.ctx, ref pendingContext); clientSession.ctx.ioPendingRequests.Add(pendingContext.id, pendingContext); + clientSession.ctx.asyncPendingCount++; clientSession.ctx.pendingReads.Add(); try @@ -226,6 +252,7 @@ static async ValueTask SlowReadAsync( catch { clientSession.ctx.ioPendingRequests.Remove(pendingContext.id); + clientSession.ctx.asyncPendingCount--; throw; } finally diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index fbb78f668..03630ad72 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -103,14 +103,16 @@ internal sealed class FasterExecutionContext : SerializedFasterExecutionContext public AsyncCountDown pendingReads; public AsyncQueue> readyResponses; public List excludedSerialNos; + public int asyncPendingCount; + + public int SyncIoPendingCount => ioPendingRequests.Count - asyncPendingCount; public bool HasNoPendingRequests { [MethodImpl(MethodImplOptions.AggressiveInlining)] get { - return ioPendingRequests.Count == 0 - && retryRequests.Count == 0; + return SyncIoPendingCount == 0 && retryRequests.Count == 0; } } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 483d2332a..038bc5763 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -279,7 +279,7 @@ internal void InternalCompletePendingRequests(FasterExecutionContext opCtx, Fast internal async ValueTask InternalCompletePendingRequestsAsync(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, ClientSession clientSession, CancellationToken token = default) { - while (opCtx.ioPendingRequests.Count > 0) + while (opCtx.SyncIoPendingCount > 0) { AsyncIOContext request; @@ -363,7 +363,7 @@ internal void InternalCompletePendingRequest(FasterExecutionContext opCtx, Faste } } - internal (Status, Output) InternalCompletePendingReadRequestAsync(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, AsyncIOContext request, PendingContext pendingContext) + internal (Status, Output) InternalCompletePendingReadRequest(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, AsyncIOContext request, PendingContext pendingContext) { (Status, Output) s = default; diff --git a/cs/src/core/Utilities/AsyncQueue.cs b/cs/src/core/Utilities/AsyncQueue.cs index 0d7b2929c..e8e51cf25 100644 --- a/cs/src/core/Utilities/AsyncQueue.cs +++ b/cs/src/core/Utilities/AsyncQueue.cs @@ -58,6 +58,16 @@ public async Task DequeueAsync(CancellationToken cancellationToken = default) } } + /// + /// Wait for queue to have at least one entry + /// + /// + /// + public async Task WaitForEntryAsync(CancellationToken token = default) + { + await semaphore.WaitAsync(token); + } + /// /// Try dequeue (if item exists) ///