Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 4, 2019
1 parent 68df34d commit b16212f
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 72 deletions.
113 changes: 73 additions & 40 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ public partial class FasterKV<Key, Value, Input, Output, Context, Functions> : F
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
/// <summary>
/// Whether we are in relaxed CPR mode, where IO pending ops are not
/// part of the CPR checkpoint
/// </summary>
public bool RelaxedCPR = false;

/// <summary>
/// Complete outstanding pending operations
/// </summary>
Expand All @@ -25,34 +31,42 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O
{
bool done = true;

#region Previous pending requests
if (threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING)
if (!RelaxedCPR)
{
#region Previous pending requests
if (clientSession.ctx.phase == Phase.IN_PROGRESS
||
clientSession.ctx.phase == Phase.WAIT_PENDING)
{

await CompleteIOPendingRequestsAsync(prevThreadCtx.Value, clientSession);
Debug.Assert(prevThreadCtx.Value.ioPendingRequests.Count == 0);
await CompleteIOPendingRequestsAsync(clientSession.prevCtx, clientSession);
Debug.Assert(clientSession.prevCtx.ioPendingRequests.Count == 0);

CompleteRetryRequests(prevThreadCtx.Value);
if (clientSession.prevCtx.retryRequests.Count > 0)
{
clientSession.ResumeThread();
CompleteRetryRequests(clientSession.prevCtx, clientSession);
clientSession.SuspendThread();
}

done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0);
done &= (prevThreadCtx.Value.retryRequests.Count == 0);
done &= (clientSession.prevCtx.ioPendingRequests.Count == 0);
done &= (clientSession.prevCtx.retryRequests.Count == 0);
}
#endregion
}
#endregion

if (!(threadCtx.Value.phase == Phase.IN_PROGRESS
if (RelaxedCPR || (!(clientSession.ctx.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING))
clientSession.ctx.phase == Phase.WAIT_PENDING)))
{
await CompleteIOPendingRequestsAsync(threadCtx.Value, clientSession);
Debug.Assert(threadCtx.Value.ioPendingRequests.Count == 0);
await CompleteIOPendingRequestsAsync(clientSession.ctx, clientSession);
Debug.Assert(clientSession.ctx.ioPendingRequests.Count == 0);
}

CompleteRetryRequests(threadCtx.Value);
CompleteRetryRequests(clientSession.ctx, clientSession);

done &= (threadCtx.Value.ioPendingRequests.Count == 0);
done &= (threadCtx.Value.retryRequests.Count == 0);
done &= (clientSession.ctx.ioPendingRequests.Count == 0);
done &= (clientSession.ctx.retryRequests.Count == 0);

if (!done)
{
Expand Down Expand Up @@ -89,6 +103,8 @@ internal async ValueTask CompleteCheckpointAsync(ClientSession<Key, Value, Input
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal async ValueTask InternalRefreshAsync(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
{
clientSession.ResumeThread();

epoch.ProtectAndDrain();
epoch.Check();

Expand All @@ -107,7 +123,7 @@ internal async ValueTask InternalRefreshAsync(ClientSession<Key, Value, Input, O
}

await HandleCheckpointingPhasesAsync(clientSession);
clientSession.ResumeThread();
clientSession.SuspendThread();
}


Expand Down Expand Up @@ -166,12 +182,12 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
{
if (!threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt])
{
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, threadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}
threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = true;
}
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, threadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}
break;
}
case Phase.INDEX_CHECKPOINT:
Expand All @@ -196,21 +212,24 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
{
if (!threadCtx.Value.markers[EpochPhaseIdx.Prepare])
{
// Thread local action
AcquireSharedLatchesForAllPendingRequests();
if (!RelaxedCPR)
{
AcquireSharedLatchesForAllPendingRequests();
}

var idx = Interlocked.Increment(ref _hybridLogCheckpoint.info.numThreads);
idx -= 1;

_hybridLogCheckpoint.info.guids[idx] = threadCtx.Value.guid;

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.Prepare, threadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}

threadCtx.Value.markers[EpochPhaseIdx.Prepare] = true;
}

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.Prepare, threadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}

break;
}
case Phase.IN_PROGRESS:
Expand All @@ -231,12 +250,15 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
AtomicSwitch(threadCtx.Value, prevThreadCtx.Value, ctx.version);
InitContext(threadCtx.Value, prevThreadCtx.Value.guid);

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.InProgress, ctx.version))
{
GlobalMoveToNextCheckpointState(currentState);
}
// Has to be prevThreadCtx, not ctx
prevThreadCtx.Value.markers[EpochPhaseIdx.InProgress] = true;
}

// Has to be prevThreadCtx, not ctx
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.InProgress, prevThreadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}
break;
}
case Phase.WAIT_PENDING:
Expand All @@ -254,7 +276,13 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
}
prevThreadCtx.Value.markers[EpochPhaseIdx.WaitPending] = true;
}

}
else
{
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitPending, threadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}
}
break;
}
Expand Down Expand Up @@ -309,6 +337,13 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
prevThreadCtx.Value.markers[EpochPhaseIdx.WaitFlush] = true;
}
}
else
{
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}
}
break;
}

Expand All @@ -318,14 +353,12 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
{
// Thread local action
functions.CheckpointCompletionCallback(threadCtx.Value.guid, prevThreadCtx.Value.serialNum);

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}

prevThreadCtx.Value.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true;
}
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.Value.version))
{
GlobalMoveToNextCheckpointState(currentState);
}
break;
}
case Phase.REST:
Expand Down
6 changes: 6 additions & 0 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functio
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientSession()
{
// We have to use relaxed CPR with async client sessions
this.RelaxedCPR = true;

Guid guid = Guid.NewGuid();
var ctx = new FasterExecutionContext();
InitContext(ctx, guid);
Expand All @@ -47,6 +50,9 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientS
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> ContinueClientSession(Guid guid, out long lsn)
{
// We have to use relaxed CPR with async client sessions
this.RelaxedCPR = true;

lsn = InternalContinue(guid);
if (lsn == -1)
throw new Exception($"Unable to find session {guid} to recover");
Expand Down
106 changes: 76 additions & 30 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,24 @@ internal void InitContext(FasterExecutionContext ctx, Guid token)
ctx.version = _systemState.version;
ctx.markers = new bool[8];
ctx.serialNum = 0;
ctx.totalPending = 0;
ctx.guid = token;
ctx.retryRequests = new Queue<PendingContext>();
ctx.readyResponses = new AsyncQueue<AsyncIOContext<Key, Value>>();
ctx.ioPendingRequests = new Dictionary<long, PendingContext>();

if (RelaxedCPR)
{
if (ctx.retryRequests == null)
{
ctx.retryRequests = new Queue<PendingContext>();
ctx.readyResponses = new AsyncQueue<AsyncIOContext<Key, Value>>();
ctx.ioPendingRequests = new Dictionary<long, PendingContext>();
}
}
else
{
ctx.totalPending = 0;
ctx.retryRequests = new Queue<PendingContext>();
ctx.readyResponses = new AsyncQueue<AsyncIOContext<Key, Value>>();
ctx.ioPendingRequests = new Dictionary<long, PendingContext>();
}
}

internal void CopyContext(FasterExecutionContext src, FasterExecutionContext dst)
Expand All @@ -147,11 +160,14 @@ internal void CopyContext(FasterExecutionContext src, FasterExecutionContext dst
dst.version = src.version;
dst.markers = src.markers;
dst.serialNum = src.serialNum;
dst.totalPending = src.totalPending;
dst.guid = src.guid;
dst.retryRequests = src.retryRequests;
dst.readyResponses = src.readyResponses;
dst.ioPendingRequests = src.ioPendingRequests;
if (!RelaxedCPR)
{
dst.totalPending = src.totalPending;
dst.retryRequests = src.retryRequests;
dst.readyResponses = src.readyResponses;
dst.ioPendingRequests = src.ioPendingRequests;
}
}

internal bool InternalCompletePending(bool wait = false)
Expand All @@ -160,19 +176,22 @@ internal bool InternalCompletePending(bool wait = false)
{
bool done = true;

#region Previous pending requests
if (threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING)
if (!RelaxedCPR)
{
CompleteIOPendingRequests(prevThreadCtx.Value);
Refresh();
CompleteRetryRequests(prevThreadCtx.Value);
#region Previous pending requests
if (threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING)
{
CompleteIOPendingRequests(prevThreadCtx.Value);
Refresh();
CompleteRetryRequests(prevThreadCtx.Value);

done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0);
done &= (prevThreadCtx.Value.retryRequests.Count == 0);
done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0);
done &= (prevThreadCtx.Value.retryRequests.Count == 0);
}
#endregion
}
#endregion

if (!(threadCtx.Value.phase == Phase.IN_PROGRESS
||
Expand Down Expand Up @@ -204,13 +223,31 @@ internal bool InternalCompletePending(bool wait = false)
internal void CompleteRetryRequests(FasterExecutionContext context)
{
int count = context.retryRequests.Count;

if (count == 0) return;

for (int i = 0; i < count; i++)
{
var pendingContext = context.retryRequests.Dequeue();
InternalRetryRequestAndCallback(context, pendingContext);
}
}

internal void CompleteRetryRequests(FasterExecutionContext context, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, CancellationToken token = default(CancellationToken))
{
int count = context.retryRequests.Count;

if (count == 0) return;

clientSession.ResumeThread();
for (int i = 0; i < count; i++)
{
var pendingContext = context.retryRequests.Dequeue();
InternalRetryRequestAndCallback(context, pendingContext);
}
clientSession.SuspendThread();
}

internal void CompleteIOPendingRequests(FasterExecutionContext context)
{
if (context.readyResponses.Count == 0) return;
Expand Down Expand Up @@ -258,15 +295,20 @@ internal void InternalRetryRequestAndCallback(
ref Key key = ref pendingContext.key.Get();
ref Value value = ref pendingContext.value.Get();

#region Entry latch operation
var handleLatches = false;
if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1)
||
(threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch
bool handleLatches = false;

if (!RelaxedCPR)
{
handleLatches = true;
#region Entry latch operation

if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1)
||
(threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch
{
handleLatches = true;
}
#endregion
}
#endregion

// Issue retry command
switch(pendingContext.type)
Expand Down Expand Up @@ -333,12 +375,16 @@ internal void InternalContinuePendingRequestAndCallback(
FasterExecutionContext ctx,
AsyncIOContext<Key, Value> request)
{
var handleLatches = false;
if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1)
||
(threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch
bool handleLatches = false;

if (!RelaxedCPR)
{
handleLatches = true;
if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1)
||
(threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch
{
handleLatches = true;
}
}

if (ctx.ioPendingRequests.TryGetValue(request.id, out PendingContext pendingContext))
Expand Down
Loading

0 comments on commit b16212f

Please sign in to comment.