Skip to content

Commit

Permalink
[C#] Refactor retry logic to ensure stability in InternalRMW et al. (#…
Browse files Browse the repository at this point in the history
…727)

* Fix missing Tombstone check in ReadOnly for RMW/Upsert
Delete doesn't need to do anything if record is already deleted
Add Unseal() on InternalContinuePendingRMW (as InternalContinuePendingRead does)
Add UTs to verify Recover() clears locks
Add an Assert in MultiThreadTests to perhaps provide info on intermittent "output.Memory" null ref

* WIP: Refactor to move epoch-releases outside InternalXxx functions
- Add HandleImmediateRetryStatus to replace "internalStatus == RETRY_NOW" in loops that call InternalXxx and pending IO continuation routines
- Ensure RETRY_LATER, rather than RETRY_NOW, is used in places such as sealing that require a subsequent operation by another thread (such as inserting a new record) to bring things to a final state (this removes some race conditions)
- Remove FasterExecutionContext.retryRequests and all related routines
- Minimize HandleOperationStatus; it's now mostly to enqueue the IO
- Update *Async.cs accordingly
- Update ReadAsync.cs to handle the fact that allocation can fail in Read due to PendingContext.CopyReadsToTailFromReadOnly
- Remove unused InternalCompletePendingRequestsAsync
- Change some names for clarity

* Finish revisions for HandleImmediateRetryStatus:
- Finish converting ReadAsync to handle retry due to memory allocation failures (which may then lead to a pending IO); it now uses the same infrastructure as RMWAsync
- Further tighten up *Async.cs handling of sync vs. async IO, removing the need for some IUpdateAsyncOperation methods since it can be handled by UpdateAsync
- Fixes for the previous commit for:
  - Compaction
  - BlockAllocate(ReadCache) for TryAllocate returning -1
  - Back out a change to LockTable.Get that was better handled by one caller
  - InternalRmw shoudl not fall through to SkipAndInvalidateReadCache if RECORD_ON_DISK
  - Remove some mistaken and investigation-only Debug.Asserts
  - Add HandleImmediateRetryStatus to some other places it was needed
- Make TryCompletePendingSyncIO handle all CompletePendingWithOutput cases so the caller knows whether to issue another TryCompleteSync()
-

* Stressie: fix Read/RMW when deletes are present

* - Remove redundant calls to OperationStatusUtils.TryConvertToCompletedStatusCode (most are already done within HandleOperationStatus)
- Rename the reaccache chain stress tests for clarity in CI  log output, and use a larger hashtable size (the previous one was almost identical to Modulo.Thousand)

* Rename UpdateAsync to AsyncOperation, since ReadAsync uses it now also

* Squashed commit of the following:

commit e239009
Author: Badrish Chandramouli <badrishc@microsoft.com>
Date:   Tue Aug 2 17:22:01 2022 -0700

    minor fix

commit 1a742b6
Author: Badrish Chandramouli <badrishc@microsoft.com>
Date:   Tue Aug 2 14:59:31 2022 -0700

    Apply same fix to Delete

commit 008c173
Author: Badrish Chandramouli <badrishc@microsoft.com>
Date:   Tue Aug 2 11:57:05 2022 -0700

    * After acquiring a latch, we should never return from InternalXxx without going to LatchRelease.
    * Add Retry status to LatchDestination so that we do not create pending context for retry scenarios.

commit 11cc933
Author: Badrish Chandramouli <badrishc@microsoft.com>
Date:   Tue Aug 2 10:11:31 2022 -0700

    Special case common methods to optimize reade ath
    Materialize session read flags merged to store read flags
    Optimize common case for HandleImmediateRetryStatus
    Add thread yield to retry now

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TedHartMS and badrishc authored Aug 3, 2022
1 parent a3aafc8 commit b1f1cef
Show file tree
Hide file tree
Showing 20 changed files with 663 additions and 775 deletions.

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions cs/src/core/Async/CompletePendingAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
/// <param name="currentCtx"></param>
/// <param name="token"></param>
/// <returns></returns>
internal ValueTask ReadyToCompletePendingAsync<Input, Output, Context>(FasterExecutionContext<Input, Output, Context> currentCtx, CancellationToken token = default)
internal static ValueTask ReadyToCompletePendingAsync<Input, Output, Context>(FasterExecutionContext<Input, Output, Context> currentCtx, CancellationToken token = default)
=> currentCtx.WaitPendingAsync(token);

/// <summary>
Expand All @@ -37,7 +37,6 @@ internal async ValueTask CompletePendingAsync<Input, Output, Context>(IFasterSes
try
{
InternalCompletePendingRequests(currentCtx, currentCtx, fasterSession, completedOutputs);
InternalCompleteRetryRequests(currentCtx, currentCtx, fasterSession, completedOutputs);
}
finally
{
Expand Down
41 changes: 18 additions & 23 deletions cs/src/core/Async/DeleteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,39 @@ namespace FASTER.core
{
public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
internal struct DeleteAsyncOperation<Input, Output, Context> : IUpdateAsyncOperation<Input, Output, Context, DeleteAsyncResult<Input, Output, Context>>
internal struct DeleteAsyncOperation<Input, Output, Context> : IAsyncOperation<Input, Output, Context, DeleteAsyncResult<Input, Output, Context>>
{
/// <inheritdoc/>
public DeleteAsyncResult<Input, Output, Context> CreateResult(Status status, Output output, RecordMetadata recordMetadata) => new DeleteAsyncResult<Input, Output, Context>(status);
public DeleteAsyncResult<Input, Output, Context> CreateCompletedResult(Status status, Output output, RecordMetadata recordMetadata) => new DeleteAsyncResult<Input, Output, Context>(status);

/// <inheritdoc/>
public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<Input, Output, Context> pendingContext, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
FasterExecutionContext<Input, Output, Context> currentCtx, bool asyncOp, out CompletionEvent flushEvent)
FasterExecutionContext<Input, Output, Context> currentCtx, out Output output)
{
OperationStatus internalStatus;
do
{
flushEvent = fasterKV.hlog.FlushEvent;
internalStatus = fasterKV.InternalDelete(ref pendingContext.key.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum);
} while (internalStatus == OperationStatus.RETRY_NOW);
} while (fasterKV.HandleImmediateRetryStatus(internalStatus, currentCtx, currentCtx, fasterSession, ref pendingContext));
output = default;
return TranslateStatus(internalStatus);
}

/// <inheritdoc/>
public ValueTask<DeleteAsyncResult<Input, Output, Context>> DoSlowOperation(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
FasterExecutionContext<Input, Output, Context> currentCtx, PendingContext<Input, Output, Context> pendingContext, CompletionEvent flushEvent, CancellationToken token)
=> SlowDeleteAsync(fasterKV, fasterSession, currentCtx, pendingContext, flushEvent, token);
FasterExecutionContext<Input, Output, Context> currentCtx, PendingContext<Input, Output, Context> pendingContext, CancellationToken token)
=> SlowDeleteAsync(fasterKV, fasterSession, currentCtx, pendingContext, token);

/// <inheritdoc/>
public bool CompletePendingIO(IFasterSession<Key, Value, Input, Output, Context> fasterSession) => false;

/// <inheritdoc/>
public void DecrementPending(FasterExecutionContext<Input, Output, Context> currentCtx, ref PendingContext<Input, Output, Context> pendingContext) { }
public bool HasPendingIO => false;
}

/// <summary>
/// State storage for the completion of an async Delete, or the result if the Delete was completed synchronously
/// </summary>
public struct DeleteAsyncResult<Input, Output, Context>
{
internal readonly UpdateAsyncInternal<Input, Output, Context, DeleteAsyncOperation<Input, Output, Context>, DeleteAsyncResult<Input, Output, Context>> updateAsyncInternal;
internal readonly AsyncOperationInternal<Input, Output, Context, DeleteAsyncOperation<Input, Output, Context>, DeleteAsyncResult<Input, Output, Context>> updateAsyncInternal;

/// <summary>Current status of the Upsert operation</summary>
public Status Status { get; }
Expand All @@ -61,7 +58,7 @@ internal DeleteAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Va
FasterExecutionContext<Input, Output, Context> currentCtx, PendingContext<Input, Output, Context> pendingContext, ExceptionDispatchInfo exceptionDispatchInfo)
{
this.Status = new(StatusCode.Pending);
updateAsyncInternal = new UpdateAsyncInternal<Input, Output, Context, DeleteAsyncOperation<Input, Output, Context>, DeleteAsyncResult<Input, Output, Context>>(
updateAsyncInternal = new AsyncOperationInternal<Input, Output, Context, DeleteAsyncOperation<Input, Output, Context>, DeleteAsyncResult<Input, Output, Context>>(
fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new ());
}

Expand All @@ -74,7 +71,7 @@ public ValueTask<DeleteAsyncResult<Input, Output, Context>> CompleteAsync(Cancel

/// <summary>Complete the Delete operation, issuing additional I/O synchronously if needed.</summary>
/// <returns>Status of Delete operation</returns>
public Status Complete() => this.Status.IsPending ? updateAsyncInternal.Complete().Status : this.Status;
public Status Complete() => this.Status.IsPending ? updateAsyncInternal.CompleteSync().Status : this.Status;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -89,20 +86,17 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input, Output, Context>(IFasterSession<Key, Value, Input, Output, Context> fasterSession,
FasterExecutionContext<Input, Output, Context> currentCtx, ref PendingContext<Input, Output, Context> pcontext, ref Key key, Context userContext, long serialNo, CancellationToken token)
{
CompletionEvent flushEvent;

fasterSession.UnsafeResumeThread();
try
{
OperationStatus internalStatus;
do
{
flushEvent = hlog.FlushEvent;
internalStatus = InternalDelete(ref key, ref userContext, ref pcontext, fasterSession, currentCtx, serialNo);
} while (internalStatus == OperationStatus.RETRY_NOW);
} while (HandleImmediateRetryStatus(internalStatus, currentCtx, currentCtx, fasterSession, ref pcontext));

if (OperationStatusUtils.TryConvertToStatusCode(internalStatus, out Status status))
return new ValueTask<DeleteAsyncResult<Input, Output, Context>>(new DeleteAsyncResult<Input, Output, Context>(new(internalStatus)));
if (OperationStatusUtils.TryConvertToCompletedStatusCode(internalStatus, out Status status))
return new ValueTask<DeleteAsyncResult<Input, Output, Context>>(new DeleteAsyncResult<Input, Output, Context>(status));
Debug.Assert(internalStatus == OperationStatus.ALLOCATE_FAILED);
}
finally
Expand All @@ -112,16 +106,17 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
fasterSession.UnsafeSuspendThread();
}

return SlowDeleteAsync(this, fasterSession, currentCtx, pcontext, flushEvent, token);
return SlowDeleteAsync(this, fasterSession, currentCtx, pcontext, token);
}

private static async ValueTask<DeleteAsyncResult<Input, Output, Context>> SlowDeleteAsync<Input, Output, Context>(
FasterKV<Key, Value> @this,
IFasterSession<Key, Value, Input, Output, Context> fasterSession,
FasterExecutionContext<Input, Output, Context> currentCtx,
PendingContext<Input, Output, Context> pcontext, CompletionEvent flushEvent, CancellationToken token = default)
PendingContext<Input, Output, Context> pcontext, CancellationToken token = default)
{
ExceptionDispatchInfo exceptionDispatchInfo = await WaitForFlushCompletionAsync(@this, currentCtx, flushEvent, token).ConfigureAwait(false);
ExceptionDispatchInfo exceptionDispatchInfo = await WaitForFlushCompletionAsync(@this, pcontext.flushEvent, token).ConfigureAwait(false);
pcontext.flushEvent = default;
return new DeleteAsyncResult<Input, Output, Context>(@this, fasterSession, currentCtx, pcontext, exceptionDispatchInfo);
}
}
Expand Down
Loading

0 comments on commit b1f1cef

Please sign in to comment.