Skip to content

Commit

Permalink
Fix CPR state machine issues when a new thread affinitized sessions s…
Browse files Browse the repository at this point in the history
…tarts during an active checkpoint. (#280)

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
tli2 and badrishc authored Jul 10, 2020
1 parent 1e72025 commit 07f8929
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
8 changes: 5 additions & 3 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Va
where Key : new()
where Value : new()
{
internal Dictionary<string, IClientSession> _activeSessions;
internal Dictionary<string, IClientSession> _activeSessions = new Dictionary<string, IClientSession>();

/// <summary>
/// Client session type helper
Expand Down Expand Up @@ -109,7 +109,7 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> NewSession<I
prevCtx.version--;

ctx.prevCtx = prevCtx;

if (_activeSessions == null)
Interlocked.CompareExchange(ref _activeSessions, new Dictionary<string, IClientSession>(), null);

Expand Down Expand Up @@ -143,6 +143,7 @@ public ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value,
/// <param name="commitPoint">Prior commit point of durability for session</param>
/// <param name="threadAffinitized">For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward.</param>
/// <returns>Session instance</returns>

public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession<Input, Output, Context, Functions>(Functions functions, string sessionId, out CommitPoint commitPoint, bool threadAffinitized = false)
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
Expand All @@ -156,6 +157,7 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSessio
if (commitPoint.UntilSerialNo == -1)
throw new Exception($"Unable to find session {sessionId} to recover");


var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, functions, !threadAffinitized);

if (_activeSessions == null)
Expand All @@ -176,4 +178,4 @@ internal void DisposeClientSession(string guid)
_activeSessions.Remove(guid);
}
}
}
}
5 changes: 4 additions & 1 deletion cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ internal void InternalRefresh<Input, Output, Context, FasterSession>(FasterExecu
internal void InitContext<Input, Output, Context>(FasterExecutionContext<Input, Output, Context> ctx, string token, long lsn = -1)
{
ctx.phase = Phase.REST;
ctx.version = systemState.version;
// The system version starts at 1. Because we do not know what the current state machine state is,
// we need to play it safe and initialize context behind the system state. Otherwise the session may
// never "catch up" with the rest of the system when stepping through the state machine as it is ahead.
ctx.version = 1;
ctx.markers = new bool[8];
ctx.serialNum = lsn;
ctx.guid = token;
Expand Down
8 changes: 3 additions & 5 deletions cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ public virtual void GlobalBeforeEnteringState<Key, Value>(SystemState next,
Array.Copy(seg, faster._hybridLogCheckpoint.info.objectLogSegmentOffsets, seg.Length);
}

if (faster._activeSessions != null)
{
// Temporarily block new sessions from starting, which may add an entry to the table and resize the
// dictionary. There should be minimal contention here.
lock (faster._activeSessions)
// write dormant sessions to checkpoint
foreach (var kvp in faster._activeSessions)
{
kvp.Value.AtomicSwitch(next.version - 1);
}
}

faster.WriteHybridLogMetaInfo();
break;
Expand Down

0 comments on commit 07f8929

Please sign in to comment.