Skip to content

Commit

Permalink
Support suspend/resume sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Aug 6, 2019
1 parent bcb75e7 commit 2eafe9b
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 63 deletions.
3 changes: 1 addition & 2 deletions cs/benchmark/ConcurrentDictionaryBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ public enum Op : ulong
Input[] input_;
Input* input_ptr;

ConcurrentDictionary<Key, Value> store;
readonly ConcurrentDictionary<Key, Value> store;

long total_ops_done = 0;

const string kKeyWorkload = "a";
readonly int threadCount;
readonly int numaStyle;
readonly string distribution;
Expand Down
3 changes: 1 addition & 2 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ public enum Op : ulong
Input[] input_;
readonly IDevice device;

FasterKV<Key, Value, Input, Output, Empty, Functions> store;
readonly FasterKV<Key, Value, Input, Output, Empty, Functions> store;

long total_ops_done = 0;

const string kKeyWorkload = "a";
readonly int threadCount;
readonly int numaStyle;
readonly string distribution;
Expand Down
30 changes: 24 additions & 6 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ public void Dispose()
/// </summary>
public void Resume()
{
fht.SetContext(prevCtx, ctx);
fht.ResumeSession(prevCtx, ctx);
}

/// <summary>
/// Suspend session on current thread
/// </summary>
public void Suspend()
{
fht.SuspendSession();
}

/// <summary>
Expand All @@ -70,7 +78,9 @@ public void Resume()
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.Read(ref key, ref input, ref output, userContext, monotonicSerialNum);
var status = fht.Read(ref key, ref input, ref output, userContext, monotonicSerialNum);
Suspend();
return status;
}

/// <summary>
Expand All @@ -84,7 +94,9 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.Upsert(ref key, ref desiredValue, userContext, monotonicSerialNum);
var status = fht.Upsert(ref key, ref desiredValue, userContext, monotonicSerialNum);
Suspend();
return status;
}

/// <summary>
Expand All @@ -98,7 +110,9 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
public Status RMW(ref Key key, ref Input input, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.RMW(ref key, ref input, userContext, monotonicSerialNum);
var status = fht.RMW(ref key, ref input, userContext, monotonicSerialNum);
Suspend();
return status;
}

/// <summary>
Expand All @@ -111,7 +125,9 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long monoto
public Status Delete(ref Key key, Context userContext, long monotonicSerialNum)
{
Resume();
return fht.Delete(ref key, userContext, monotonicSerialNum);
var status = fht.Delete(ref key, userContext, monotonicSerialNum);
Suspend();
return status;
}

/// <summary>
Expand All @@ -122,7 +138,9 @@ public Status Delete(ref Key key, Context userContext, long monotonicSerialNum)
public bool CompletePending(bool wait = false)
{
Resume();
return fht.CompletePending(wait);
var result = fht.CompletePending(wait);
Suspend();
return result;
}
}
}
48 changes: 16 additions & 32 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functio
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
/// <summary>
/// Start new shared (not thread-specific) session with FASTER
/// Start new shared (not thread-specific) session with FASTER.
/// Session starts in dormant state.
/// </summary>
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientSession()
Expand All @@ -29,46 +30,29 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientS
}


internal void SetContext(FasterExecutionContext prevThreadCtx, FasterExecutionContext threadCtx)
/// <summary>
/// Resume session with FASTER
/// </summary>
/// <param name="prevThreadCtx"></param>
/// <param name="threadCtx"></param>
internal void ResumeSession(FasterExecutionContext prevThreadCtx, FasterExecutionContext threadCtx)
{
if (!epoch.ThreadEntry.IsInitializedForThread)
{
epoch.Acquire();
epoch.ThreadEntry.InitializeThread();
this.prevThreadCtx.InitializeThread();
this.threadCtx.InitializeThread();
}
epoch.Resume();

// Copy contexts to thread-local
this.prevThreadCtx.InitializeThread();
this.threadCtx.InitializeThread();

this.prevThreadCtx.Value = prevThreadCtx;
this.threadCtx.Value = threadCtx;
Refresh();
}

/// <summary>
/// Suspend session with FASTER
/// </summary>
internal void SuspendSession(bool completePending = false)
internal void SuspendSession()
{
if (completePending)
{
while (true)
{
bool done = true;
if (threadCtx.Value.retryRequests.Count != 0 || threadCtx.Value.ioPendingRequests.Count != 0)
done = false;

if (prevThreadCtx.Value != default(FasterExecutionContext))
{
if (prevThreadCtx.Value.retryRequests.Count != 0 || prevThreadCtx.Value.ioPendingRequests.Count != 0)
done = false;
}
if (threadCtx.Value.phase != Phase.REST)
done = false;

if (done) break;
Refresh();
}
}
epoch.Release();
epoch.Suspend();
}
}
}
12 changes: 12 additions & 0 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ public void Suspend()
(*(tableAligned + entry)).localCurrentEpoch = int.MaxValue;
}

/// <summary>
/// Thread resumes its epoch entry
/// </summary>
public void Resume()
{
if (!threadEntryIndex.IsInitializedForThread || threadEntryIndex.Value == kInvalidIndex)
{
Acquire();
}
ProtectAndDrain();
}

internal FastThreadLocal<int> ThreadEntry => threadEntryIndex;

/// <summary>
Expand Down
6 changes: 4 additions & 2 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager)
if (metadata == null)
throw new Exception("Invalid log commit metadata for ID " + token.ToString());

Initialize(new StreamReader(new MemoryStream(metadata)));
using (var s = new StreamReader(new MemoryStream(metadata)))
Initialize(s);
}

/// <summary>
Expand Down Expand Up @@ -402,7 +403,8 @@ public void Recover(Guid guid, ICheckpointManager checkpointManager)
var metadata = checkpointManager.GetIndexCommitMetadata(guid);
if (metadata == null)
throw new Exception("Invalid index commit metadata for ID " + guid.ToString());
Initialize(new StreamReader(new MemoryStream(metadata)));
using (var s = new StreamReader(new MemoryStream(metadata)))
Initialize(s);
}

public byte[] ToByteArray()
Expand Down
32 changes: 27 additions & 5 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

Expand Down Expand Up @@ -69,8 +70,8 @@ private enum CheckpointType

private ConcurrentDictionary<Guid, long> _recoveredSessions;

private FastThreadLocal<FasterExecutionContext> prevThreadCtx;
private FastThreadLocal<FasterExecutionContext> threadCtx;
private readonly FastThreadLocal<FasterExecutionContext> prevThreadCtx;
private readonly FastThreadLocal<FasterExecutionContext> threadCtx;


/// <summary>
Expand Down Expand Up @@ -322,6 +323,26 @@ public bool CompletePending(bool wait = false)
return InternalCompletePending(wait);
}

/// <summary>
/// Get list of pending requests (for local session)
/// </summary>
/// <returns></returns>
public IEnumerable<long> GetPendingRequests()
{

foreach (var kvp in prevThreadCtx.Value.ioPendingRequests)
yield return kvp.Value.serialNum;

foreach (var val in prevThreadCtx.Value.retryRequests)
yield return val.serialNum;

foreach (var kvp in threadCtx.Value.ioPendingRequests)
yield return kvp.Value.serialNum;

foreach (var val in threadCtx.Value.retryRequests)
yield return val.serialNum;
}

/// <summary>
/// Complete outstanding pending operations
/// </summary>
Expand Down Expand Up @@ -383,7 +404,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
{
var context = default(PendingContext);
var internalStatus = InternalRead(ref key, ref input, ref output, ref userContext, ref context);
var status = default(Status);
Status status;
if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND)
{
status = (Status)internalStatus;
Expand All @@ -409,7 +430,7 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
{
var context = default(PendingContext);
var internalStatus = InternalUpsert(ref key, ref desiredValue, ref userContext, ref context);
var status = default(Status);
Status status;

if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND)
{
Expand All @@ -436,7 +457,7 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long monoto
{
var context = default(PendingContext);
var internalStatus = InternalRMW(ref key, ref input, ref userContext, ref context);
var status = default(Status);
Status status;
if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND)
{
status = (Status)internalStatus;
Expand Down Expand Up @@ -491,6 +512,7 @@ public void Dispose()
threadCtx.Dispose();
prevThreadCtx.Dispose();
hlog.Dispose();
readcache?.Dispose();
}
}
}
8 changes: 4 additions & 4 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ internal OperationStatus InternalRead(
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = threadCtx.Value.version;
pendingContext.serialNum = threadCtx.Value.serialNum + 1;
pendingContext.serialNum = threadCtx.Value.serialNum;
}
#endregion

Expand Down Expand Up @@ -591,7 +591,7 @@ internal OperationStatus InternalUpsert(
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = threadCtx.Value.version;
pendingContext.serialNum = threadCtx.Value.serialNum + 1;
pendingContext.serialNum = threadCtx.Value.serialNum;
}
#endregion

Expand Down Expand Up @@ -920,7 +920,7 @@ ref hlog.GetValue(physicalAddress),
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = threadCtx.Value.version;
pendingContext.serialNum = threadCtx.Value.serialNum + 1;
pendingContext.serialNum = threadCtx.Value.serialNum;
}
#endregion

Expand Down Expand Up @@ -1613,7 +1613,7 @@ internal OperationStatus InternalDelete(
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = threadCtx.Value.version;
pendingContext.serialNum = threadCtx.Value.serialNum + 1;
pendingContext.serialNum = threadCtx.Value.serialNum;
}
#endregion

Expand Down
9 changes: 5 additions & 4 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ internal void InternalRefresh()

// We check if we are in normal mode
var newPhaseInfo = SystemState.Copy(ref _systemState);
if (threadCtx.Value.phase == Phase.REST && newPhaseInfo.phase == Phase.REST)
if (threadCtx.Value.phase == Phase.REST && newPhaseInfo.phase == Phase.REST && threadCtx.Value.version == newPhaseInfo.version)
{
return;
}
Expand Down Expand Up @@ -276,11 +276,12 @@ internal void CompleteIOPendingRequests(FasterExecutionContext context)
var threadCtxCopy = threadCtx.Value;

// Suspend epoch
epoch.Suspend();
SuspendSession();

request = await context.readyResponses.DequeueAsync(token);
// Restore context
SetContext(prevThreadCtxCopy, threadCtxCopy);

// Resume session
ResumeSession(prevThreadCtxCopy, threadCtxCopy);
}

InternalContinuePendingRequestAndCallback(context, request);
Expand Down
Loading

0 comments on commit 2eafe9b

Please sign in to comment.