Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Convert *UnsafeContext to structs and acquire on demand #750

Merged
merged 3 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions cs/benchmark/FasterSpanByteYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ private void RunYcsbUnsafeContext(int thread_idx)
#endif

var session = store.For(functions).NewSession<FunctionsSB>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

try
{
Expand Down Expand Up @@ -203,10 +203,9 @@ private void RunYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}

uContext.Dispose();
session.Dispose();

sw.Stop();
Expand Down Expand Up @@ -479,8 +478,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
waiter.Wait();

var session = store.For(functions).NewSession<FunctionsSB>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

#if DASHBOARD
var tstart = Stopwatch.GetTimestamp();
Expand Down Expand Up @@ -531,9 +530,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}
uContext.Dispose();
session.Dispose();
}

Expand Down
19 changes: 9 additions & 10 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ private void RunYcsbUnsafeContext(int thread_idx)
#endif

var session = store.For(functions).NewSession<Functions>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

try
{
Expand Down Expand Up @@ -199,10 +199,9 @@ private void RunYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}

uContext.Dispose();
session.Dispose();

sw.Stop();
Expand Down Expand Up @@ -319,7 +318,8 @@ internal unsafe (double, double) Run(TestLoader testLoader)
if (testLoader.Options.LockImpl == (int)LockImpl.Manual)
{
session = store.For(functions).NewSession<Functions>();
luContext = session.GetLockableUnsafeContext();
luContext = session.LockableUnsafeContext;
luContext.BeginLockable();

Console.WriteLine("Taking 2 manual locks");
luContext.Lock(xlock.key, xlock.kind);
Expand Down Expand Up @@ -436,7 +436,7 @@ internal unsafe (double, double) Run(TestLoader testLoader)
{
luContext.Unlock(xlock.key, xlock.kind);
luContext.Unlock(slock.key, slock.kind);
luContext.Dispose();
luContext.EndLockable();
session.Dispose();
}

Expand Down Expand Up @@ -467,8 +467,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
waiter.Wait();

var session = store.For(functions).NewSession<Functions>();
var uContext = session.GetUnsafeContext();
uContext.ResumeThread();
var uContext = session.UnsafeContext;
uContext.BeginUnsafe();

#if DASHBOARD
var tstart = Stopwatch.GetTimestamp();
Expand Down Expand Up @@ -518,9 +518,8 @@ private void SetupYcsbUnsafeContext(int thread_idx)
}
finally
{
uContext.SuspendThread();
uContext.EndUnsafe();
}
uContext.Dispose();
session.Dispose();
}

Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/ClientSession/BasicContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ namespace FASTER.core
{
readonly ClientSession<Key, Value, Input, Output, Context, Functions> clientSession;

/// <summary>Indicates whether this struct has been initialized</summary>
public bool IsNull => this.clientSession is null;

internal BasicContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
{
this.clientSession = clientSession;
Expand Down
114 changes: 72 additions & 42 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -35,8 +36,8 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions>

internal readonly InternalFasterSession FasterSession;

UnsafeContext<Key, Value, Input, Output, Context, Functions> uContext;
LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> luContext;
readonly UnsafeContext<Key, Value, Input, Output, Context, Functions> uContext;
readonly LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> luContext;
readonly LockableContext<Key, Value, Input, Output, Context, Functions> lContext;
readonly BasicContext<Key, Value, Input, Output, Context, Functions> bContext;

Expand All @@ -49,32 +50,53 @@ public sealed class ClientSession<Key, Value, Input, Output, Context, Functions>
internal ulong sharedLockCount;
internal ulong exclusiveLockCount;

bool isAcquired;
bool isAcquiredLockable;

internal void Acquire()
internal void AcquireLockable()
{
CheckNotAcquired();
fht.IncrementNumLockingSessions();
isAcquired = true;
CheckIsNotAcquiredLockable();

while (true)
{
// Checkpoints cannot complete while we have active locking sessions.
while (IsInPreparePhase())
Thread.Yield();

fht.IncrementNumLockingSessions();
isAcquiredLockable = true;

if (!IsInPreparePhase())
break;
InternalReleaseLockable();
Thread.Yield();
}
}

internal void Release()
internal void ReleaseLockable()
{
CheckAcquired();
isAcquired = false;
CheckIsAcquiredLockable();
if (TotalLockCount > 0)
throw new FasterException($"EndLockable called with locks held: {sharedLockCount} shared locks, {exclusiveLockCount} exclusive locks");
InternalReleaseLockable();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void InternalReleaseLockable()
{
isAcquiredLockable = false;
fht.DecrementNumLockingSessions();
}

internal void CheckAcquired()
internal void CheckIsAcquiredLockable()
{
if (!isAcquired)
throw new FasterException("Method call on not-acquired Context");
if (!isAcquiredLockable)
throw new FasterException("Lockable method call when BeginLockable has not been called");
}

void CheckNotAcquired()
void CheckIsNotAcquiredLockable()
{
if (isAcquired)
throw new FasterException("Method call on acquired Context");
if (isAcquiredLockable)
throw new FasterException("BeginLockable cannot be called twice (call EndLockable first)");
}

internal ClientSession(
Expand All @@ -86,6 +108,8 @@ internal ClientSession(
{
this.lContext = new(this);
this.bContext = new(this);
this.luContext = new(this);
this.uContext = new(this);

this.loggerFactory = loggerFactory;
this.logger = loggerFactory?.CreateLogger($"ClientSession-{GetHashCode():X8}");
Expand Down Expand Up @@ -199,22 +223,12 @@ public void Dispose()
/// <summary>
/// Return a new interface to Faster operations that supports manual epoch control.
/// </summary>
public UnsafeContext<Key, Value, Input, Output, Context, Functions> GetUnsafeContext()
{
this.uContext ??= new(this);
this.uContext.Acquire();
return this.uContext;
}
public UnsafeContext<Key, Value, Input, Output, Context, Functions> UnsafeContext => uContext;

/// <summary>
/// Return a new interface to Faster operations that supports manual locking and epoch control.
/// </summary>
public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> GetLockableUnsafeContext()
{
this.luContext ??= new(this);
this.luContext.Acquire();
return this.luContext;
}
public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> LockableUnsafeContext => luContext;

/// <summary>
/// Return a session wrapper that supports manual locking.
Expand Down Expand Up @@ -655,40 +669,51 @@ public async ValueTask ReadyToCompletePendingAsync(CancellationToken token = def
#region Other Operations

/// <inheritdoc/>
public unsafe void ResetModified(ref Key key)
public void ResetModified(ref Key key)
{
UnsafeResumeThread();
try
{
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out _);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
UnsafeResetModified(ref key);
}
finally
{
UnsafeSuspendThread();
}
}

internal void UnsafeResetModified(ref Key key)
{
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out _);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
}

/// <inheritdoc/>
public unsafe void ResetModified(Key key) => ResetModified(ref key);

/// <inheritdoc/>
internal unsafe bool IsModified(ref Key key)
internal bool IsModified(ref Key key)
{
RecordInfo modifiedInfo;
UnsafeResumeThread();
try
{
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
return UnsafeIsModified(ref key);
}
finally
{
UnsafeSuspendThread();
}
}

internal bool UnsafeIsModified(ref Key key)
{
RecordInfo modifiedInfo;
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false);
while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
return modifiedInfo.Modified;
}

Expand Down Expand Up @@ -848,6 +873,8 @@ public IFasterScanIterator<Key, Value> Iterate(long untilAddress = -1)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void UnsafeResumeThread()
{
// We do not track any "acquired" state here; if someone mixes calls between safe and unsafe contexts, they will
// get the "trying to acquire already-acquired epoch" error.
fht.epoch.Resume();
fht.InternalRefresh(ctx, FasterSession);
}
Expand All @@ -856,8 +883,11 @@ internal void UnsafeResumeThread()
/// Suspend session on current thread
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void UnsafeSuspendThread()
=> fht.epoch.Suspend();
internal void UnsafeSuspendThread()
{
Debug.Assert(fht.epoch.ThisInstanceProtected());
fht.epoch.Suspend();
}

void IClientSession.AtomicSwitch(long version)
{
Expand All @@ -867,7 +897,7 @@ void IClientSession.AtomicSwitch(long version)
/// <summary>
/// Return true if Faster State Machine is in PREPARE sate
/// </summary>
public bool IsInPreparePhase()
internal bool IsInPreparePhase()
{
return this.fht.SystemState.Phase == Phase.PREPARE;
}
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/ClientSession/ILockableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ namespace FASTER.core
/// <typeparam name="TKey"></typeparam>
public interface ILockableContext<TKey>
{
/// <summary>
/// Begins a series of lock operations on possibly multiple keys; call before any locks are taken.
/// </summary>
void BeginLockable();

/// <summary>
/// Ends a series of lock operations on possibly multiple keys; call after all locks are released.
/// </summary>
void EndLockable();

/// <summary>
/// Lock the key with the specified <paramref name="lockType"/>, waiting until it is acquired
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/ClientSession/IUnsafeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ namespace FASTER.core
public interface IUnsafeContext
{
/// <summary>
/// Resume session on current thread. IMPORTANT: Call SuspendThread before any async op.
/// Resume session on current thread. IMPORTANT: Call <see cref="EndUnsafe"/> before any async op.
/// </summary>
void ResumeThread();
void BeginUnsafe();

/// <summary>
/// Suspend session on current thread
/// </summary>
void SuspendThread();
void EndUnsafe();
}
}
Loading