Skip to content

Commit

Permalink
[C#] Do not auto-acquire lockable context (#740)
Browse files Browse the repository at this point in the history
* Do not auto-acquire lockable context
Expose Acquire/Release for LC
FasterLogIterator dispose fix

* Added BasicContext as struct wrapper for client session
Convert LockableContext to struct

* add getters for session contexts

* updates

* updates
  • Loading branch information
badrishc authored Aug 27, 2022
1 parent 5ed8d46 commit 5d4c67d
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 110 deletions.
228 changes: 228 additions & 0 deletions cs/src/core/ClientSession/BasicContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace FASTER.core
{
/// <summary>
/// Basic Faster Context implementation.
/// </summary>
public readonly struct BasicContext<Key, Value, Input, Output, Context, Functions> : IFasterContext<Key, Value, Input, Output, Context>
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
readonly ClientSession<Key, Value, Input, Output, Context, Functions> clientSession;

internal BasicContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
{
this.clientSession = clientSession;
}

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void UnsafeResumeThread()
=> clientSession.UnsafeResumeThread();

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void UnsafeResumeThread(out int resumeEpoch)
=> clientSession.UnsafeResumeThread(out resumeEpoch);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void UnsafeSuspendThread()
=> clientSession.UnsafeSuspendThread();

#region IFasterContext

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
=> clientSession.CompletePending(wait, spinWaitForCommit);

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
=> clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit);

/// <inheritdoc/>
public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
=> clientSession.CompletePendingAsync(waitForCommit, token);

/// <inheritdoc/>
public ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default)
=> clientSession.CompletePendingWithOutputsAsync(waitForCommit, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
=> clientSession.Read(ref key, ref input, ref output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(Key key, Input input, out Output output, Context userContext = default, long serialNo = 0)
=> clientSession.Read(key, input, out output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Output output, Context userContext = default, long serialNo = 0)
=> clientSession.Read(ref key, ref output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(Key key, out Output output, Context userContext = default, long serialNo = 0)
=> clientSession.Read(key, out output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public (Status status, Output output) Read(Key key, Context userContext = default, long serialNo = 0)
=> clientSession.Read(key, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, ref ReadOptions readOptions, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0)
=> clientSession.Read(ref key, ref input, ref output, ref readOptions, out recordMetadata, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status ReadAtAddress(ref Input input, ref Output output, ref ReadOptions readOptions, Context userContext = default, long serialNo = 0)
=> clientSession.ReadAtAddress(ref input, ref output, ref readOptions, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(ref Key key, ref Input input, Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
=> clientSession.ReadAsync(ref key, ref input, userContext, serialNo, cancellationToken);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(Key key, Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.ReadAsync(key, input, context, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(ref Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.ReadAsync(ref key, userContext, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(Key key, Context context = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.ReadAsync(key, context, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAsync(ref Key key, ref Input input, ref ReadOptions readOptions, Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
=> clientSession.ReadAsync(ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Context>> ReadAtAddressAsync(ref Input input, ref ReadOptions readOptions, Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
=> clientSession.ReadAtAddressAsync(ref input, ref readOptions, userContext, serialNo, cancellationToken);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0)
=> clientSession.Upsert(ref key, ref desiredValue, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref Output output, Context userContext = default, long serialNo = 0)
=> clientSession.Upsert(ref key, ref input, ref desiredValue, ref output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref Output output, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0)
=> clientSession.Upsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(Key key, Value desiredValue, Context userContext = default, long serialNo = 0)
=> clientSession.Upsert(key, desiredValue, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(Key key, Input input, Value desiredValue, ref Output output, Context userContext = default, long serialNo = 0)
=> clientSession.Upsert(key, input, desiredValue, ref output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.UpsertAsync(ref key, ref desiredValue, userContext, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(ref Key key, ref Input input, ref Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.UpsertAsync(ref key, ref input, ref desiredValue, userContext, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(Key key, Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.UpsertAsync(key, desiredValue, userContext, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.UpsertAsyncResult<Input, Output, Context>> UpsertAsync(Key key, Input input, Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.UpsertAsync(key, input, desiredValue, userContext, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
=> clientSession.RMW(ref key, ref input, ref output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input, ref Output output, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0)
=> clientSession.RMW(ref key, ref input, ref output, out recordMetadata, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(Key key, Input input, out Output output, Context userContext = default, long serialNo = 0)
=> clientSession.RMW(key, input, out output, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0)
=> clientSession.RMW(ref key, ref input, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status RMW(Key key, Input input, Context userContext = default, long serialNo = 0)
=> clientSession.RMW(key, input, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.RmwAsyncResult<Input, Output, Context>> RMWAsync(ref Key key, ref Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.RMWAsync(ref key, ref input, context, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.RmwAsyncResult<Input, Output, Context>> RMWAsync(Key key, Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.RMWAsync(key, input, context, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Delete(ref Key key, Context userContext = default, long serialNo = 0)
=> clientSession.Delete(ref key, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Delete(Key key, Context userContext = default, long serialNo = 0)
=> clientSession.Delete(key, userContext, serialNo);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> DeleteAsync(ref Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.DeleteAsync(ref key, userContext, serialNo, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<FasterKV<Key, Value>.DeleteAsyncResult<Input, Output, Context>> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default)
=> clientSession.DeleteAsync(key, userContext, serialNo, token);

/// <inheritdoc/>
public void Refresh()
=> clientSession.Refresh();

#endregion IFasterContext
}
}
63 changes: 52 additions & 11 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,63 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions>

UnsafeContext<Key, Value, Input, Output, Context, Functions> uContext;
LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> luContext;
LockableContext<Key, Value, Input, Output, Context, Functions> lContext;
readonly LockableContext<Key, Value, Input, Output, Context, Functions> lContext;
readonly BasicContext<Key, Value, Input, Output, Context, Functions> bContext;

internal const string NotAsyncSessionErr = "Session does not support async operations";

readonly ILoggerFactory loggerFactory;
readonly ILogger logger;

internal ulong TotalLockCount => sharedLockCount + exclusiveLockCount;
internal ulong sharedLockCount;
internal ulong exclusiveLockCount;

bool isAcquired;

internal void Acquire()
{
CheckNotAcquired();
fht.IncrementNumLockingSessions();
isAcquired = true;
}

internal void Release()
{
CheckAcquired();
isAcquired = false;
fht.DecrementNumLockingSessions();
}

internal void CheckAcquired()
{
if (!isAcquired)
throw new FasterException("Method call on not-acquired Context");
}

void CheckNotAcquired()
{
if (isAcquired)
throw new FasterException("Method call on acquired Context");
}

/// <summary>
/// Local current epoch
/// </summary>
public int LocalCurrentEpoch => fht.epoch.LocalCurrentEpoch;

internal ClientSession(
FasterKV<Key, Value> fht,
FasterKV<Key, Value>.FasterExecutionContext<Input, Output, Context> ctx,
Functions functions,
SessionVariableLengthStructSettings<Value, Input> sessionVariableLengthStructSettings,
ILoggerFactory loggerFactory = null)
{
this.lContext = new(this);
this.bContext = new(this);

this.loggerFactory = loggerFactory;
this.logger = loggerFactory?.CreateLogger($"ClientSession-{GetHashCode().ToString("X8")}");
this.logger = loggerFactory?.CreateLogger($"ClientSession-{GetHashCode():X8}");
this.fht = fht;
this.ctx = ctx;
this.functions = functions;
Expand All @@ -72,7 +113,7 @@ internal ClientSession(
}
else
{
if (!(fht.hlog is VariableLengthBlittableAllocator<Key, Value>))
if (fht.hlog is not VariableLengthBlittableAllocator<Key, Value>)
logger?.LogWarning("Warning: Session param of variableLengthStruct provided for non-varlen allocator");
}

Expand Down Expand Up @@ -101,7 +142,7 @@ internal ClientSession(

private void UpdateVarlen(ref IVariableLengthStruct<Value, Input> variableLengthStruct)
{
if (!(fht.hlog is VariableLengthBlittableAllocator<Key, Value>))
if (fht.hlog is not VariableLengthBlittableAllocator<Key, Value>)
return;

if (typeof(Value) == typeof(SpanByte) && typeof(Input) == typeof(SpanByte))
Expand Down Expand Up @@ -181,14 +222,14 @@ public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> GetL
}

/// <summary>
/// Return a new interface to Faster operations that supports manual locking.
/// Return a session wrapper that supports manual locking.
/// </summary>
public LockableContext<Key, Value, Input, Output, Context, Functions> GetLockableContext()
{
this.lContext ??= new(this);
this.lContext.Acquire();
return this.lContext;
}
public LockableContext<Key, Value, Input, Output, Context, Functions> LockableContext => lContext;

/// <summary>
/// Return a session wrapper struct that passes through to client session
/// </summary>
public BasicContext<Key, Value, Input, Output, Context, Functions> BasicContext => bContext;

#region IFasterContext
/// <inheritdoc/>
Expand Down
Loading

0 comments on commit 5d4c67d

Please sign in to comment.