Skip to content

Commit

Permalink
Updates to support sessions natively without thread-local
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 12, 2019
1 parent 6cab32a commit 78c2649
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 381 deletions.
30 changes: 15 additions & 15 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private void RunYcsb(int thread_idx)
int count = 0;
#endif

store.StartSession();
var session = store.StartClientSession(false);

while (!done)
{
Expand All @@ -135,25 +135,25 @@ private void RunYcsb(int thread_idx)

if (idx % 256 == 0)
{
store.Refresh();
session.Refresh();

if (idx % 65536 == 0)
{
store.CompletePending(false);
session.CompletePending(false);
}
}

switch (op)
{
case Op.Upsert:
{
store.Upsert(ref txn_keys_[idx], ref value, Empty.Default, 1);
session.Upsert(ref txn_keys_[idx], ref value, Empty.Default, 1);
++writes_done;
break;
}
case Op.Read:
{
Status result = store.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default, 1);
Status result = session.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default, 1);
if (result == Status.OK)
{
++reads_done;
Expand All @@ -162,7 +162,7 @@ private void RunYcsb(int thread_idx)
}
case Op.ReadModifyWrite:
{
Status result = store.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default, 1);
Status result = session.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default, 1);
if (result == Status.OK)
{
++writes_done;
Expand Down Expand Up @@ -191,8 +191,9 @@ private void RunYcsb(int thread_idx)
#endif
}

store.CompletePending(true);
store.StopSession();
session.CompletePending(true);
session.Dispose();

sw.Stop();

Console.WriteLine("Thread " + thread_idx + " done; " + reads_done + " reads, " +
Expand Down Expand Up @@ -311,7 +312,7 @@ private void SetupYcsb(int thread_idx)
else
Native32.AffinitizeThreadShardedNuma((uint)thread_idx, 2); // assuming two NUMA sockets

store.StartSession();
var session = store.StartClientSession(false);

#if DASHBOARD
var tstart = Stopwatch.GetTimestamp();
Expand All @@ -330,15 +331,15 @@ private void SetupYcsb(int thread_idx)
{
if (idx % 256 == 0)
{
store.Refresh();
session.Refresh();

if (idx % 65536 == 0)
{
store.CompletePending(false);
session.CompletePending(false);
}
}

store.Upsert(ref init_keys_[idx], ref value, Empty.Default, 1);
session.Upsert(ref init_keys_[idx], ref value, Empty.Default, 1);
}
#if DASHBOARD
count += (int)kChunkSize;
Expand All @@ -356,9 +357,8 @@ private void SetupYcsb(int thread_idx)
#endif
}


store.CompletePending(true);
store.StopSession();
session.CompletePending(true);
session.Dispose();
}

#if DASHBOARD
Expand Down
97 changes: 58 additions & 39 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@ namespace FASTER.core
/// <typeparam name="Output"></typeparam>
/// <typeparam name="Context"></typeparam>
/// <typeparam name="Functions"></typeparam>
public class ClientSession<Key, Value, Input, Output, Context, Functions> : IDisposable
public sealed class ClientSession<Key, Value, Input, Output, Context, Functions> : IDisposable
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
private FasterKV<Key, Value, Input, Output, Context, Functions> fht;
internal FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx;
internal FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx;
private readonly bool supportAsync = false;
private readonly FasterKV<Key, Value, Input, Output, Context, Functions> fht;
internal readonly FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx;

internal ClientSession(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx)
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx,
bool supportAsync)
{
this.fht = fht;
this.prevCtx = prevCtx;
this.ctx = ctx;
this.supportAsync = supportAsync;
if (supportAsync)
{
fht.UseRelaxedCPR();
}
else
{
UnsafeResumeThread();
}
}

/// <summary>
Expand All @@ -46,23 +54,23 @@ internal ClientSession(
/// </summary>
public void Dispose()
{
TryCompletePending(true);
CompletePending(true);
fht.DisposeClientSession(ID);
}

/// <summary>
/// Resume session on current thread
/// Call SuspendThread before any async op
/// </summary>
public void UnsafeResumeThread()
internal void UnsafeResumeThread()
{
fht.ResumeSession(prevCtx, ctx);
fht.ResumeSession(ctx);
}

/// <summary>
/// Suspend session on current thread
/// </summary>
public void UnsafeSuspendThread()
internal void UnsafeSuspendThread()
{
fht.SuspendSession();
}
Expand All @@ -74,13 +82,13 @@ public void UnsafeSuspendThread()
/// <param name="input"></param>
/// <param name="output"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long monotonicSerialNum)
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long serialNo)
{
UnsafeResumeThread();
var status = fht.Read(ref key, ref input, ref output, userContext, monotonicSerialNum);
UnsafeSuspendThread();
if (supportAsync) UnsafeResumeThread();
var status = fht.Read(ref key, ref input, ref output, userContext, serialNo, ctx);
if (supportAsync) UnsafeSuspendThread();
return status;
}

Expand All @@ -90,13 +98,13 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
/// <param name="key"></param>
/// <param name="desiredValue"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long monotonicSerialNum)
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long serialNo)
{
UnsafeResumeThread();
var status = fht.Upsert(ref key, ref desiredValue, userContext, monotonicSerialNum);
UnsafeSuspendThread();
if (supportAsync) UnsafeResumeThread();
var status = fht.Upsert(ref key, ref desiredValue, userContext, serialNo, ctx);
if (supportAsync) UnsafeSuspendThread();
return status;
}

Expand All @@ -106,13 +114,13 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
public Status RMW(ref Key key, ref Input input, Context userContext, long monotonicSerialNum)
public Status RMW(ref Key key, ref Input input, Context userContext, long serialNo)
{
UnsafeResumeThread();
var status = fht.RMW(ref key, ref input, userContext, monotonicSerialNum);
UnsafeSuspendThread();
if (supportAsync) UnsafeResumeThread();
var status = fht.RMW(ref key, ref input, userContext, serialNo, ctx);
if (supportAsync) UnsafeSuspendThread();
return status;
}

Expand All @@ -121,27 +129,36 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long monoto
/// </summary>
/// <param name="key"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <param name="serialNo"></param>
/// <returns></returns>
public Status Delete(ref Key key, Context userContext, long monotonicSerialNum)
public Status Delete(ref Key key, Context userContext, long serialNo)
{
UnsafeResumeThread();
var status = fht.Delete(ref key, userContext, monotonicSerialNum);
UnsafeSuspendThread();
if (supportAsync) UnsafeResumeThread();
var status = fht.Delete(ref key, userContext, serialNo);
if (supportAsync) UnsafeSuspendThread();
return status;
}

/// <summary>
/// Refresh session, handling checkpointing if needed
/// </summary>
public void Refresh()
{
if (supportAsync) UnsafeResumeThread();
fht.InternalRefresh(ctx);
if (supportAsync) UnsafeSuspendThread();
}

/// <summary>
/// Sync complete outstanding pending operations
/// </summary>
/// <param name="spinWait"></param>
/// <returns></returns>
public bool TryCompletePending(bool spinWait = false)
public bool CompletePending(bool spinWait = false)
{
UnsafeResumeThread();
var result = fht.CompletePending(spinWait);
UnsafeSuspendThread();
if (supportAsync) UnsafeResumeThread();
var result = fht.InternalCompletePending(ctx, spinWait);
if (supportAsync) UnsafeSuspendThread();
return result;
}

Expand All @@ -151,6 +168,7 @@ public bool TryCompletePending(bool spinWait = false)
/// <returns></returns>
public async ValueTask CompletePendingAsync()
{
if (!supportAsync) throw new NotSupportedException();
await fht.CompletePendingAsync(this);
}

Expand All @@ -159,11 +177,11 @@ public async ValueTask CompletePendingAsync()
/// </summary>
/// <param name="spinWait"></param>
/// <returns></returns>
public bool TryCompleteCheckpoint(bool spinWait = false)
public bool CompleteCheckpoint(bool spinWait = false)
{
UnsafeResumeThread();
if (supportAsync) UnsafeResumeThread();
var result = fht.CompleteCheckpoint(spinWait);
UnsafeSuspendThread();
if (supportAsync) UnsafeSuspendThread();
return result;
}

Expand All @@ -173,7 +191,8 @@ public bool TryCompleteCheckpoint(bool spinWait = false)
/// <returns></returns>
internal async ValueTask CompleteCheckpointAsync()
{
await fht.CompleteCheckpointAsync(this);
if (!supportAsync) throw new NotSupportedException();
await fht.CompleteCheckpointAsync(ctx, this);
}
}
}
Loading

0 comments on commit 78c2649

Please sign in to comment.