Skip to content

Commit

Permalink
Ordered IO completion option in CompletePending
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Mar 28, 2023
1 parent b4d91a2 commit 391caa8
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 69 deletions.
4 changes: 2 additions & 2 deletions cs/src/core/Async/CompletePendingAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ internal static ValueTask ReadyToCompletePendingAsync<Input, Output, Context>(Fa
/// </summary>
/// <returns></returns>
internal async ValueTask CompletePendingAsync<Input, Output, Context, FasterSession>(FasterSession fasterSession,
CancellationToken token, CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs)
CancellationToken token, CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool orderedResponses)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
while (true)
{
fasterSession.UnsafeResumeThread();
try
{
InternalCompletePendingRequests(fasterSession, completedOutputs);
InternalCompletePendingRequests(fasterSession, completedOutputs, orderedResponses);
}
finally
{
Expand Down
16 changes: 8 additions & 8 deletions cs/src/core/ClientSession/BasicContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ public void UnsafeSuspendThread()
#region IFasterContext

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
=> clientSession.CompletePending(wait, spinWaitForCommit);
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
=> clientSession.CompletePending(wait, spinWaitForCommit, orderedResponses);

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
=> clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit);
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
=> clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit, orderedResponses);

/// <inheritdoc/>
public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
=> clientSession.CompletePendingAsync(waitForCommit, token);
public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default)
=> clientSession.CompletePendingAsync(waitForCommit, orderedResponses, token);

/// <inheritdoc/>
public ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default)
=> clientSession.CompletePendingWithOutputsAsync(waitForCommit, token);
public ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default)
=> clientSession.CompletePendingWithOutputsAsync(waitForCommit, orderedResponses, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
40 changes: 20 additions & 20 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -610,14 +610,14 @@ public IEnumerable<long> GetPendingRequests()
}

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
=> CompletePending(false, wait, spinWaitForCommit);
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
=> CompletePending(false, wait, spinWaitForCommit, orderedResponses);

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
{
InitializeCompletedOutputs();
var result = CompletePending(true, wait, spinWaitForCommit);
var result = CompletePending(true, wait, spinWaitForCommit, orderedResponses);
completedOutputs = this.completedOutputs;
return result;
}
Expand All @@ -626,11 +626,11 @@ public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, I
/// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations.
/// Assumes epoch protection is managed by user. Async operations must be completed individually.
/// </summary>
internal bool UnsafeCompletePendingWithOutputs<FasterSession>(FasterSession fasterSession, out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
internal bool UnsafeCompletePendingWithOutputs<FasterSession>(FasterSession fasterSession, out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
InitializeCompletedOutputs();
var result = UnsafeCompletePending(fasterSession, true, wait, spinWaitForCommit);
var result = UnsafeCompletePending(fasterSession, true, wait, spinWaitForCommit, orderedResponses);
completedOutputs = this.completedOutputs;
return result;
}
Expand All @@ -643,34 +643,34 @@ private void InitializeCompletedOutputs()
this.completedOutputs.Dispose();
}

internal bool CompletePending(bool getOutputs, bool wait, bool spinWaitForCommit)
internal bool CompletePending(bool getOutputs, bool wait, bool spinWaitForCommit, bool orderedResponses)
{
UnsafeResumeThread();
try
{
return UnsafeCompletePending(FasterSession, getOutputs, wait, spinWaitForCommit);
return UnsafeCompletePending(FasterSession, getOutputs, wait, spinWaitForCommit, orderedResponses);
}
finally
{
UnsafeSuspendThread();
}
}

internal bool UnsafeCompletePending<FasterSession>(FasterSession fasterSession, bool getOutputs, bool wait, bool spinWaitForCommit)
internal bool UnsafeCompletePending<FasterSession>(FasterSession fasterSession, bool getOutputs, bool wait, bool spinWaitForCommit, bool orderedResponses)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
var requestedOutputs = getOutputs ? this.completedOutputs : default;
var result = fht.InternalCompletePending(fasterSession, wait, requestedOutputs);
var result = fht.InternalCompletePending(fasterSession, wait, requestedOutputs, orderedResponses);
if (spinWaitForCommit)
{
if (!wait)
throw new FasterException("Can spin-wait for commit (checkpoint completion) only if wait is true");
do
{
fht.InternalCompletePending(fasterSession, wait, requestedOutputs);
fht.InternalCompletePending(fasterSession, wait, requestedOutputs, orderedResponses);
if (fht.InRestPhase())
{
fht.InternalCompletePending(fasterSession, wait, requestedOutputs);
fht.InternalCompletePending(fasterSession, wait, requestedOutputs, orderedResponses);
return true;
}
} while (wait);
Expand All @@ -679,26 +679,26 @@ internal bool UnsafeCompletePending<FasterSession>(FasterSession fasterSession,
}

/// <inheritdoc/>
public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
=> CompletePendingAsync(false, waitForCommit, token);
public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default)
=> CompletePendingAsync(false, waitForCommit, orderedResponses, token);

/// <inheritdoc/>
public async ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default)
public async ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default)
{
InitializeCompletedOutputs();
await CompletePendingAsync(true, waitForCommit, token).ConfigureAwait(false);
await CompletePendingAsync(true, waitForCommit, orderedResponses, token).ConfigureAwait(false);
return this.completedOutputs;
}

private async ValueTask CompletePendingAsync(bool getOutputs, bool waitForCommit = false, CancellationToken token = default)
private async ValueTask CompletePendingAsync(bool getOutputs, bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default)
{
token.ThrowIfCancellationRequested();

if (fht.epoch.ThisInstanceProtected())
throw new NotSupportedException("Async operations not supported over protected epoch");

// Complete all pending operations on session
await fht.CompletePendingAsync(this.FasterSession, token, getOutputs ? this.completedOutputs : null).ConfigureAwait(false);
await fht.CompletePendingAsync(this.FasterSession, token, getOutputs ? this.completedOutputs : null, orderedResponses).ConfigureAwait(false);

// Wait for commit if necessary
if (waitForCommit)
Expand Down Expand Up @@ -1381,8 +1381,8 @@ public IHeapContainer<Input> GetHeapContainer(ref Input input)

public void UnsafeSuspendThread() => _clientSession.UnsafeSuspendThread();

public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
=> _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit);
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
=> _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit, orderedResponses);

public FasterKV<Key, Value>.FasterExecutionContext<Input, Output, Context> Ctx => this._clientSession.ctx;
#endregion Internal utilities
Expand Down
10 changes: 6 additions & 4 deletions cs/src/core/ClientSession/IFasterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ public interface IFasterContext<Key, Value, Input, Output, Context>
/// </summary>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <param name="orderedResponses">Whether responses are process in the order of IO issue</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
bool CompletePending(bool wait = false, bool spinWaitForCommit = false);
bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false);

/// <summary>
/// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations.
Expand All @@ -27,22 +28,23 @@ public interface IFasterContext<Key, Value, Input, Output, Context>
/// <param name="completedOutputs">Outputs completed by this operation</param>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <param name="orderedResponses">Whether responses are process in the order of IO issue</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false);
bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false);

/// <summary>
/// Complete all pending synchronous FASTER operations.
/// Async operations must be completed individually.
/// </summary>
/// <returns></returns>
ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default);
ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default);

/// <summary>
/// Complete all pending synchronous FASTER operations, returning outputs for the completed operations.
/// Async operations must be completed individually.
/// </summary>
/// <returns>Outputs completed by this operation</returns>
ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default);
ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default);

/// <summary>
/// Read operation
Expand Down
20 changes: 10 additions & 10 deletions cs/src/core/ClientSession/LockableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ public void Unlock<TLockableKey>(TLockableKey[] keys, int start, int count)
#region IFasterContext

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
clientSession.UnsafeResumeThread();
try
{
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit);
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit, orderedResponses);
}
finally
{
Expand All @@ -185,13 +185,13 @@ public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
}

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
clientSession.UnsafeResumeThread();
try
{
return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit);
return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit, orderedResponses);
}
finally
{
Expand All @@ -200,12 +200,12 @@ public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, I
}

/// <inheritdoc/>
public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
=> this.clientSession.CompletePendingAsync(waitForCommit, token);
public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default)
=> this.clientSession.CompletePendingAsync(waitForCommit, orderedResponses, token);

/// <inheritdoc/>
public ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default)
=> this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token);
public ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default)
=> this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, orderedResponses, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -750,8 +750,8 @@ public IHeapContainer<Input> GetHeapContainer(ref Input input)

public void UnsafeSuspendThread() => _clientSession.UnsafeSuspendThread();

public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
=> _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit);
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false)
=> _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit, orderedResponses);

public FasterKV<Key, Value>.FasterExecutionContext<Input, Output, Context> Ctx => this._clientSession.ctx;
#endregion Internal utilities
Expand Down
Loading

0 comments on commit 391caa8

Please sign in to comment.