diff --git a/cs/src/core/ClientSession/FASTERClientSession.cs b/cs/src/core/ClientSession/FASTERClientSession.cs index 2e38ffb04..f5df13224 100644 --- a/cs/src/core/ClientSession/FASTERClientSession.cs +++ b/cs/src/core/ClientSession/FASTERClientSession.cs @@ -13,7 +13,7 @@ public unsafe partial class FasterKV : FasterBase, IFasterKV _activeSessions; + internal Dictionary _activeSessions = new Dictionary(); /// /// Client session type helper @@ -109,7 +109,7 @@ public ClientSession NewSession(), null); @@ -143,6 +143,7 @@ public ClientSessionPrior commit point of durability for session /// For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward. /// Session instance + public ClientSession ResumeSession(Functions functions, string sessionId, out CommitPoint commitPoint, bool threadAffinitized = false) where Functions : IFunctions { @@ -156,6 +157,7 @@ public ClientSession ResumeSessio if (commitPoint.UntilSerialNo == -1) throw new Exception($"Unable to find session {sessionId} to recover"); + var session = new ClientSession(this, ctx, functions, !threadAffinitized); if (_activeSessions == null) @@ -176,4 +178,4 @@ internal void DisposeClientSession(string guid) _activeSessions.Remove(guid); } } -} +} \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index fa6731540..c5d86bb5f 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -87,7 +87,10 @@ internal void InternalRefresh(FasterExecu internal void InitContext(FasterExecutionContext ctx, string token, long lsn = -1) { ctx.phase = Phase.REST; - ctx.version = systemState.version; + // The system version starts at 1. Because we do not know what the current state machine state is, + // we need to play it safe and initialize context behind the system state. Otherwise the session may + // never "catch up" with the rest of the system when stepping through the state machine as it is ahead. + ctx.version = 1; ctx.markers = new bool[8]; ctx.serialNum = lsn; ctx.guid = token; diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs index ced26a97a..6ccc8695f 100644 --- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -42,14 +42,12 @@ public virtual void GlobalBeforeEnteringState(SystemState next, Array.Copy(seg, faster._hybridLogCheckpoint.info.objectLogSegmentOffsets, seg.Length); } - if (faster._activeSessions != null) - { + // Temporarily block new sessions from starting, which may add an entry to the table and resize the + // dictionary. There should be minimal contention here. + lock (faster._activeSessions) // write dormant sessions to checkpoint foreach (var kvp in faster._activeSessions) - { kvp.Value.AtomicSwitch(next.version - 1); - } - } faster.WriteHybridLogMetaInfo(); break;