Skip to content

Commit

Permalink
[C#] Checkpointing tests (#373)
Browse files Browse the repository at this point in the history
* Fix thread continuing in different checkpoint state machine.
* Added epoch protection to ContainsKeyInMemory.
* Another bugfix
* Added debug assert to ensure epoch protection correctness.
  • Loading branch information
badrishc authored Nov 25, 2020
1 parent 58cbb5e commit 3853e88
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 39 deletions.
18 changes: 0 additions & 18 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -602,24 +602,6 @@ protected void Initialize(long firstValidAddress)
TailPageOffset.Offset = (int)(firstValidAddress & PageSizeMask);
}

/// <summary>
/// Acquire thread
/// </summary>
public void Acquire()
{
if (ownedEpoch)
epoch.Resume();
}

/// <summary>
/// Release thread
/// </summary>
public void Release()
{
if (ownedEpoch)
epoch.Suspend();
}

/// <summary>
/// Dispose allocator
/// </summary>
Expand Down
10 changes: 9 additions & 1 deletion cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,15 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)
/// <returns>Status</returns>
internal Status ContainsKeyInMemory(ref Key key, long fromAddress = -1)
{
return fht.InternalContainsKeyInMemory(ref key, ctx, FasterSession, fromAddress);
if (SupportAsync) UnsafeResumeThread();
try
{
return fht.InternalContainsKeyInMemory(ref key, ctx, FasterSession, fromAddress);
}
finally
{
if (SupportAsync) UnsafeSuspendThread();
}
}

/// <summary>
Expand Down
7 changes: 6 additions & 1 deletion cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ private void Acquire()
{
if (threadEntryIndex == kInvalidIndex)
threadEntryIndex = ReserveEntryForThread();

Debug.Assert((*(tableAligned + threadEntryIndex)).localCurrentEpoch == 0,
"Trying to acquire protected epoch. Make sure you do not re-enter FASTER from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");

threadEntryIndexCount++;
}

Expand All @@ -243,7 +247,8 @@ private void Release()
{
int entry = threadEntryIndex;

Debug.Assert((*(tableAligned + entry)).localCurrentEpoch != 0, "Trying to release unprotected epoch");
Debug.Assert((*(tableAligned + entry)).localCurrentEpoch != 0,
"Trying to release unprotected epoch. Make sure you do not re-enter FASTER from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");

(*(tableAligned + entry)).localCurrentEpoch = 0;
(*(tableAligned + entry)).threadId = 0;
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ internal sealed class FasterExecutionContext<Input, Output, Context> : Serialize
public AsyncQueue<AsyncIOContext<Key, Value>> readyResponses;
public List<long> excludedSerialNos;
public int asyncPendingCount;
public ISynchronizationStateMachine threadStateMachine;

public int SyncIoPendingCount => ioPendingRequests.Count - asyncPendingCount;

Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ internal void CopyContext<Input, Output, Context>(FasterExecutionContext<Input,
{
dst.phase = src.phase;
dst.version = src.version;
dst.threadStateMachine = src.threadStateMachine;
dst.markers = src.markers;
dst.serialNum = src.serialNum;
dst.guid = src.guid;
Expand Down
37 changes: 27 additions & 10 deletions cs/src/core/Index/Synchronization/FasterStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,20 @@ private SystemState FastForwardToCurrentCycle(SystemState threadState, SystemSta
}

/// <summary>
/// Check whether threadState is in same cycle compared to current systemState
/// Check whether thread is in same cycle compared to current systemState
/// </summary>
/// <param name="ctx"></param>
/// <param name="threadState"></param>
/// <returns></returns>
internal bool SameCycle(SystemState threadState)
internal bool SameCycle<Input, Output, Context>(FasterExecutionContext<Input, Output, Context> ctx, SystemState threadState)
{
var _systemState = SystemState.Copy(ref systemState);
SystemState.RemoveIntermediate(ref _systemState);
return StartOfCurrentCycle(threadState).version == StartOfCurrentCycle(_systemState).version;
if (ctx == null)
{
var _systemState = SystemState.Copy(ref systemState);
SystemState.RemoveIntermediate(ref _systemState);
return StartOfCurrentCycle(threadState).version == StartOfCurrentCycle(_systemState).version;
}
return ctx.threadStateMachine == currentSyncStateMachine;
}

/// <summary>
Expand Down Expand Up @@ -170,7 +175,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
fasterSession?.CheckpointCompletionCallback(ctx.guid, commitPoint);
}
}
if ((ctx.version == targetStartState.version) && (ctx.phase < Phase.REST))
if ((ctx.version == targetStartState.version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine))
{
IssueCompletionCallback(ctx, fasterSession);
}
Expand All @@ -185,6 +190,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
{
ctx.phase = targetState.phase;
ctx.version = targetState.version;
ctx.threadStateMachine = currentTask;
}
return;
}
Expand All @@ -193,9 +199,20 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
// We start at either the start point or our previous position in the state machine.
// If we are calling from somewhere other than an execution thread (e.g. waiting on
// a checkpoint to complete on a client app thread), we start at current system state
var threadState =
ctx == null ? targetState :
FastForwardToCurrentCycle(currentState, targetStartState);
var threadState = targetState;

if (ctx != null)
{
if (ctx.threadStateMachine == currentTask)
{
threadState = currentState;
}
else
{
threadState = targetStartState;
ctx.threadStateMachine = currentTask;
}
}

var previousState = threadState;
do
Expand All @@ -212,7 +229,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
{
ctx.phase = threadState.phase;
ctx.version = threadState.version;
}
}

previousState.word = threadState.word;
threadState = currentTask.NextState(threadState);
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public override void OnThreadState<Key, Value, Input, Output, Context, FasterSes
var s = faster._hybridLogCheckpoint.flushedSemaphore;

var notify = faster.hlog.FlushedUntilAddress >= faster._hybridLogCheckpoint.info.finalLogicalAddress;
notify = notify || !faster.SameCycle(current) || s == null;
notify = notify || !faster.SameCycle(ctx, current) || s == null;

if (valueTasks != null && !notify)
{
Expand Down Expand Up @@ -218,7 +218,7 @@ public override void OnThreadState<Key, Value, Input, Output, Context, FasterSes
var s = faster._hybridLogCheckpoint.flushedSemaphore;

var notify = s != null && s.CurrentCount > 0;
notify = notify || !faster.SameCycle(current) || s == null;
notify = notify || !faster.SameCycle(ctx, current) || s == null;

if (valueTasks != null && !notify)
{
Expand Down
10 changes: 6 additions & 4 deletions cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
break;

case Phase.WAIT_INDEX_CHECKPOINT:
case Phase.WAIT_INDEX_ONLY_CHECKPOINT:
break;

case Phase.REST:
Expand Down Expand Up @@ -74,13 +75,14 @@ public void OnThreadState<Key, Value, Input, Output, Context, FasterSession>(
faster.GlobalStateMachineStep(current);
break;
case Phase.WAIT_INDEX_CHECKPOINT:
case Phase.WAIT_INDEX_ONLY_CHECKPOINT:
var notify = faster.IsIndexFuzzyCheckpointCompleted();
notify = notify || !faster.SameCycle(current);
notify = notify || !faster.SameCycle(ctx, current);

if (valueTasks != null && !notify)
{
var t = faster.IsIndexFuzzyCheckpointCompletedAsync(token);
if (!faster.SameCycle(current))
if (!faster.SameCycle(ctx, current))
notify = true;
else
valueTasks.Add(t);
Expand Down Expand Up @@ -115,9 +117,9 @@ public override SystemState NextState(SystemState start)
result.phase = Phase.PREP_INDEX_CHECKPOINT;
break;
case Phase.PREP_INDEX_CHECKPOINT:
result.phase = Phase.WAIT_INDEX_CHECKPOINT;
result.phase = Phase.WAIT_INDEX_ONLY_CHECKPOINT;
break;
case Phase.WAIT_INDEX_CHECKPOINT:
case Phase.WAIT_INDEX_ONLY_CHECKPOINT:
result.phase = Phase.REST;
break;
default:
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Index/Synchronization/StateTransitions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ internal enum Phase : int {
WAIT_FLUSH,
PERSISTENCE_CALLBACK,
REST,
PREP_INDEX_CHECKPOINT,
PREP_INDEX_CHECKPOINT,
WAIT_INDEX_ONLY_CHECKPOINT,
PREPARE,
PREPARE_GROW,
IN_PROGRESS_GROW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void OnThreadState<Key, Value, Input, Output, Context, FasterSession>(
// Need to be very careful here as threadCtx is changing
var _ctx = prev.phase == Phase.IN_PROGRESS ? ctx.prevCtx : ctx;
var tokens = faster._hybridLogCheckpoint.info.checkpointTokens;
if (!faster.SameCycle(current) || tokens == null)
if (!faster.SameCycle(ctx, current) || tokens == null)
return;

if (!_ctx.markers[EpochPhaseIdx.InProgress])
Expand Down
2 changes: 1 addition & 1 deletion cs/test/ObjectRecoveryTest3.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void DeleteDirectory(string path)
[Test]
public async ValueTask ObjectRecoveryTest3(
[Values]CheckpointType checkpointType,
[Values(5000)] int iterations,
[Values(2000)] int iterations,
[Values]bool isAsync)
{
this.iterations = iterations;
Expand Down

0 comments on commit 3853e88

Please sign in to comment.