Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CPR Checkpoint Bug fix #280

Merged
merged 3 commits into from
Jul 10, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
@@ -9,20 +9,23 @@

namespace FASTER.core
{
public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase, IFasterKV<Key, Value, Input, Output, Context, Functions>
public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase,
IFasterKV<Key, Value, Input, Output, Context, Functions>
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
internal Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>> _activeSessions;
internal Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>> _activeSessions =
new Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>>();

/// <summary>
/// Start a new client session with FASTER.
/// </summary>
/// <param name="sessionId">ID/name of session (auto-generated if not provided)</param>
/// <param name="threadAffinitized">For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward.</param>
/// <returns>Session instance</returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(string sessionId = null, bool threadAffinitized = false)
public ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(string sessionId = null,
bool threadAffinitized = false)
{
if (!threadAffinitized)
UseRelaxedCPR();
@@ -37,10 +40,8 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(s

ctx.prevCtx = prevCtx;

if (_activeSessions == null)
Interlocked.CompareExchange(ref _activeSessions, new Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>>(), null);

var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, !threadAffinitized);
var session =
new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, !threadAffinitized);
lock (_activeSessions)
_activeSessions.Add(sessionId, session);
return session;
@@ -54,7 +55,8 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> NewSession(s
/// <param name="commitPoint">Prior commit point of durability for session</param>
/// <param name="threadAffinitized">For advanced users. Specifies whether session holds the thread epoch across calls. Do not use with async code. Ensure thread calls session Refresh periodically to move the system epoch forward.</param>
/// <returns>Session instance</returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession(string sessionId, out CommitPoint commitPoint, bool threadAffinitized = false)
public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession(string sessionId,
out CommitPoint commitPoint, bool threadAffinitized = false)
{
if (!threadAffinitized)
UseRelaxedCPR();
@@ -63,10 +65,9 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSessio
if (commitPoint.UntilSerialNo == -1)
throw new Exception($"Unable to find session {sessionId} to recover");

var session = new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, !threadAffinitized);
var session =
new ClientSession<Key, Value, Input, Output, Context, Functions>(this, ctx, !threadAffinitized);

if (_activeSessions == null)
Interlocked.CompareExchange(ref _activeSessions, new Dictionary<string, ClientSession<Key, Value, Input, Output, Context, Functions>>(), null);
lock (_activeSessions)
_activeSessions.Add(sessionId, session);
return session;
@@ -83,4 +84,4 @@ internal void DisposeClientSession(string guid)
_activeSessions.Remove(guid);
}
}
}
}
5 changes: 4 additions & 1 deletion cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
@@ -88,7 +88,10 @@ internal void InternalRefresh(FasterExecutionContext ctx, ClientSession<Key, Val
internal void InitContext(FasterExecutionContext ctx, string token, long lsn = -1)
{
ctx.phase = Phase.REST;
ctx.version = _systemState.version;
// The system version starts at 1. Because we do not know what the current state machine state is,
// we need to play it safe and initialize context behind the system state. Otherwise the session may
// never "catch up" with the rest of the system when stepping through the state machine as it is ahead.
ctx.version = 1;
ctx.markers = new bool[8];
ctx.serialNum = lsn;
ctx.guid = token;
8 changes: 3 additions & 5 deletions cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
Original file line number Diff line number Diff line change
@@ -43,14 +43,12 @@ public virtual void GlobalBeforeEnteringState<Key, Value, Input, Output, Context
Array.Copy(seg, faster._hybridLogCheckpoint.info.objectLogSegmentOffsets, seg.Length);
}

if (faster._activeSessions != null)
{
// Temporarily block new sessions from starting, which may add an entry to the table and resize the
// dictionary. There should be minimal contention here.
lock (faster._activeSessions)
// write dormant sessions to checkpoint
foreach (var kvp in faster._activeSessions)
{
faster.AtomicSwitch(kvp.Value.ctx, kvp.Value.ctx.prevCtx, next.version - 1);
}
}

faster.WriteHybridLogMetaInfo();
break;