Skip to content

Commit

Permalink
[C# ]Add TryLock and TryPromoteSharedToExclusive (#848)
Browse files Browse the repository at this point in the history
* Add Try(Promote)Lock

* Add drain of writer to HashBucket.TryLockShared
Clean up the latching spinCount loops and add "readonly" to "get" accessors in HashBucket and RecordInfo

* remove stray SetDirty (it's set later)

* Add UpsertOptions, RMWOptions, and DeleteOptions, all with KeyHash, and add overloads of these operations to take this argument. Add KeyHash to ReadOptions and add more Read overloads to include ReadOptions.

* LockCode => KeyHash

* Clean up more LockCode -> KeyHash; handle null pool in SectorAlignedMemory.Return

* Move GetKeyHash from ILockableContext to IFasterContext and implement on all session types
  • Loading branch information
TedHartMS authored Aug 24, 2023
1 parent 7e41e49 commit 4d5412b
Show file tree
Hide file tree
Showing 37 changed files with 2,777 additions and 461 deletions.
28 changes: 19 additions & 9 deletions cs/src/core/Async/DeleteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
internal struct DeleteAsyncOperation<Input, Output, Context> : IAsyncOperation<Input, Output, Context, DeleteAsyncResult<Input, Output, Context>>
{
DeleteOptions deleteOptions;

internal DeleteAsyncOperation(ref DeleteOptions deleteOptions)
{
this.deleteOptions = deleteOptions;
}

/// <inheritdoc/>
public DeleteAsyncResult<Input, Output, Context> CreateCompletedResult(Status status, Output output, RecordMetadata recordMetadata) => new DeleteAsyncResult<Input, Output, Context>(status);

Expand All @@ -21,9 +28,11 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
out Output output)
{
OperationStatus internalStatus;
ref var key = ref pendingContext.key.Get();
var keyHash = deleteOptions.KeyHash ?? fasterKV.comparer.GetHashCode64(ref key);
do
{
internalStatus = fasterKV.InternalDelete(ref pendingContext.key.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, pendingContext.serialNum);
internalStatus = fasterKV.InternalDelete(ref key, keyHash, ref pendingContext.userContext, ref pendingContext, fasterSession, pendingContext.serialNum);
} while (fasterKV.HandleImmediateRetryStatus(internalStatus, fasterSession, ref pendingContext));
output = default;
return TranslateStatus(internalStatus);
Expand All @@ -32,7 +41,7 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
/// <inheritdoc/>
public ValueTask<DeleteAsyncResult<Input, Output, Context>> DoSlowOperation(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
PendingContext<Input, Output, Context> pendingContext, CancellationToken token)
=> SlowDeleteAsync(fasterKV, fasterSession, pendingContext, token);
=> SlowDeleteAsync(fasterKV, fasterSession, pendingContext, deleteOptions, token);

/// <inheritdoc/>
public bool HasPendingIO => false;
Expand All @@ -55,11 +64,11 @@ internal DeleteAsyncResult(Status status)
}

internal DeleteAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
PendingContext<Input, Output, Context> pendingContext, ExceptionDispatchInfo exceptionDispatchInfo)
PendingContext<Input, Output, Context> pendingContext, ref DeleteOptions deleteOptions, ExceptionDispatchInfo exceptionDispatchInfo)
{
this.Status = new(StatusCode.Pending);
updateAsyncInternal = new AsyncOperationInternal<Input, Output, Context, DeleteAsyncOperation<Input, Output, Context>, DeleteAsyncResult<Input, Output, Context>>(
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new ());
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new (ref deleteOptions));
}

/// <summary>Complete the Delete operation, issuing additional allocation asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
Expand All @@ -76,7 +85,7 @@ public ValueTask<DeleteAsyncResult<Input, Output, Context>> CompleteAsync(Cancel

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input, Output, Context, FasterSession>(FasterSession fasterSession,
ref Key key, Context userContext, long serialNo, CancellationToken token = default)
ref Key key, ref DeleteOptions deleteOptions, Context userContext, long serialNo, CancellationToken token = default)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
var pcontext = new PendingContext<Input, Output, Context> { IsAsync = true };
Expand All @@ -85,9 +94,10 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
try
{
OperationStatus internalStatus;
var keyHash = deleteOptions.KeyHash ?? comparer.GetHashCode64(ref key);
do
{
internalStatus = InternalDelete(ref key, ref userContext, ref pcontext, fasterSession, serialNo);
internalStatus = InternalDelete(ref key, keyHash, ref userContext, ref pcontext, fasterSession, serialNo);
} while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));

if (OperationStatusUtils.TryConvertToCompletedStatusCode(internalStatus, out Status status))
Expand All @@ -101,17 +111,17 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
fasterSession.UnsafeSuspendThread();
}

return SlowDeleteAsync(this, fasterSession, pcontext, token);
return SlowDeleteAsync(this, fasterSession, pcontext, deleteOptions, token);
}

private static async ValueTask<DeleteAsyncResult<Input, Output, Context>> SlowDeleteAsync<Input, Output, Context>(
FasterKV<Key, Value> @this,
IFasterSession<Key, Value, Input, Output, Context> fasterSession,
PendingContext<Input, Output, Context> pcontext, CancellationToken token = default)
PendingContext<Input, Output, Context> pcontext, DeleteOptions deleteOptions, CancellationToken token = default)
{
ExceptionDispatchInfo exceptionDispatchInfo = await WaitForFlushCompletionAsync(@this, pcontext.flushEvent, token).ConfigureAwait(false);
pcontext.flushEvent = default;
return new DeleteAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, exceptionDispatchInfo);
return new DeleteAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, ref deleteOptions, exceptionDispatchInfo);
}
}
}
33 changes: 20 additions & 13 deletions cs/src/core/Async/RMWAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
internal struct RmwAsyncOperation<Input, Output, Context> : IAsyncOperation<Input, Output, Context, RmwAsyncResult<Input, Output, Context>>
{
AsyncIOContext<Key, Value> diskRequest;
internal RmwAsyncOperation(AsyncIOContext<Key, Value> diskRequest) => this.diskRequest = diskRequest;
RMWOptions rmwOptions;

internal RmwAsyncOperation(AsyncIOContext<Key, Value> diskRequest, ref RMWOptions rmwOptions)
{
this.diskRequest = diskRequest;
this.rmwOptions = rmwOptions;
}

/// <inheritdoc/>
public RmwAsyncResult<Input, Output, Context> CreateCompletedResult(Status status, Output output, RecordMetadata recordMetadata) => new(status, output, recordMetadata);
Expand All @@ -25,8 +31,8 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
{
Status status = !this.diskRequest.IsDefault()
? fasterKV.InternalCompletePendingRequestFromContext(fasterSession, this.diskRequest, ref pendingContext, out AsyncIOContext<Key, Value> newDiskRequest)
: fasterKV.CallInternalRMW(fasterSession, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, pendingContext.userContext,
pendingContext.serialNum, out newDiskRequest);
: fasterKV.CallInternalRMW(fasterSession, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, ref this.rmwOptions,
pendingContext.userContext, pendingContext.serialNum, out newDiskRequest);
output = pendingContext.output;
this.diskRequest = newDiskRequest;
return status;
Expand All @@ -35,7 +41,7 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
/// <inheritdoc/>
public ValueTask<RmwAsyncResult<Input, Output, Context>> DoSlowOperation(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
PendingContext<Input, Output, Context> pendingContext, CancellationToken token)
=> SlowRmwAsync(fasterKV, fasterSession, pendingContext, diskRequest, token);
=> SlowRmwAsync(fasterKV, fasterSession, pendingContext, rmwOptions, diskRequest, token);

/// <inheritdoc/>
public bool HasPendingIO => !this.diskRequest.IsDefault();
Expand Down Expand Up @@ -67,13 +73,13 @@ internal RmwAsyncResult(Status status, TOutput output, RecordMetadata recordMeta
}

internal RmwAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, TOutput, Context> fasterSession,
PendingContext<Input, TOutput, Context> pendingContext, AsyncIOContext<Key, Value> diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
PendingContext<Input, TOutput, Context> pendingContext, ref RMWOptions rmwOptions, AsyncIOContext<Key, Value> diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
{
Status = new(StatusCode.Pending);
this.Output = default;
this.RecordMetadata = default;
updateAsyncInternal = new AsyncOperationInternal<Input, TOutput, Context, RmwAsyncOperation<Input, TOutput, Context>, RmwAsyncResult<Input, TOutput, Context>>(
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new (diskRequest));
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new (diskRequest, ref rmwOptions));
}

/// <summary>Complete the RMW operation, issuing additional (rare) I/O asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
Expand Down Expand Up @@ -105,7 +111,7 @@ public ValueTask<RmwAsyncResult<Input, TOutput, Context>> CompleteAsync(Cancella

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Output, Context, FasterSession>(FasterSession fasterSession,
ref Key key, ref Input input, Context context, long serialNo, CancellationToken token = default)
ref Key key, ref Input input, ref RMWOptions rmwOptions, Context context, long serialNo, CancellationToken token = default)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
var pcontext = new PendingContext<Input, Output, Context> { IsAsync = true };
Expand All @@ -115,7 +121,7 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu
try
{
Output output = default;
var status = CallInternalRMW(fasterSession, ref pcontext, ref key, ref input, ref output, context, serialNo, out diskRequest);
var status = CallInternalRMW(fasterSession, ref pcontext, ref key, ref input, ref output, ref rmwOptions, context, serialNo, out diskRequest);
if (!status.IsPending)
return new ValueTask<RmwAsyncResult<Input, Output, Context>>(new RmwAsyncResult<Input, Output, Context>(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
}
Expand All @@ -126,29 +132,30 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu
fasterSession.UnsafeSuspendThread();
}

return SlowRmwAsync(this, fasterSession, pcontext, diskRequest, token);
return SlowRmwAsync(this, fasterSession, pcontext, rmwOptions, diskRequest, token);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private Status CallInternalRMW<Input, Output, Context>(IFasterSession<Key, Value, Input, Output, Context> fasterSession, ref PendingContext<Input, Output, Context> pcontext,
ref Key key, ref Input input, ref Output output, Context context, long serialNo, out AsyncIOContext<Key, Value> diskRequest)
ref Key key, ref Input input, ref Output output, ref RMWOptions rmwOptions, Context context, long serialNo, out AsyncIOContext<Key, Value> diskRequest)
{
OperationStatus internalStatus;
var keyHash = rmwOptions.KeyHash ?? comparer.GetHashCode64(ref key);
do
internalStatus = InternalRMW(ref key, ref input, ref output, ref context, ref pcontext, fasterSession, serialNo);
internalStatus = InternalRMW(ref key, keyHash, ref input, ref output, ref context, ref pcontext, fasterSession, serialNo);
while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));

return HandleOperationStatus(fasterSession.Ctx, ref pcontext, internalStatus, out diskRequest);
}

private static async ValueTask<RmwAsyncResult<Input, Output, Context>> SlowRmwAsync<Input, Output, Context>(
FasterKV<Key, Value> @this, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
PendingContext<Input, Output, Context> pcontext, AsyncIOContext<Key, Value> diskRequest, CancellationToken token = default)
PendingContext<Input, Output, Context> pcontext, RMWOptions rmwOptions, AsyncIOContext<Key, Value> diskRequest, CancellationToken token = default)
{
ExceptionDispatchInfo exceptionDispatchInfo;
(diskRequest, exceptionDispatchInfo) = await WaitForFlushOrIOCompletionAsync(@this, fasterSession.Ctx, pcontext.flushEvent, diskRequest, token);
pcontext.flushEvent = default;
return new RmwAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, diskRequest, exceptionDispatchInfo);
return new RmwAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, ref rmwOptions, diskRequest, exceptionDispatchInfo);
}
}
}
3 changes: 2 additions & 1 deletion cs/src/core/Async/ReadAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ private Status CallInternalRead<Input, Output, Context>(IFasterSession<Key, Valu
out AsyncIOContext<Key, Value> diskRequest)
{
OperationStatus internalStatus;
var keyHash = readOptions.KeyHash ?? comparer.GetHashCode64(ref key);
do
internalStatus = InternalRead(ref key, ref input, ref output, readOptions.StartAddress, ref context, ref pcontext, fasterSession, serialNo);
internalStatus = InternalRead(ref key, keyHash, ref input, ref output, readOptions.StartAddress, ref context, ref pcontext, fasterSession, serialNo);
while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));

return HandleOperationStatus(fasterSession.Ctx, ref pcontext, internalStatus, out diskRequest);
Expand Down
Loading

0 comments on commit 4d5412b

Please sign in to comment.