Skip to content

Commit

Permalink
Wired in excluded serial nos for commit points.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 5, 2019
1 parent 94b901e commit 9afc5fb
Show file tree
Hide file tree
Showing 30 changed files with 155 additions and 93 deletions.
5 changes: 3 additions & 2 deletions cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Runtime.CompilerServices;
using System.Diagnostics;
using FASTER.core;
using System.Collections.Generic;

namespace FASTER.benchmark
{
Expand All @@ -28,9 +29,9 @@ public void DeleteCompletionCallback(ref Key key, Empty ctx)
{
}

public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
{
Debug.WriteLine("Session {0} reports persistence until {1}", sessionId, serialNum);
Debug.WriteLine("Session {0} reports persistence until {1}", sessionId, commitPoint.UntilSerialNo);
}

// Read functions
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/ClassCache/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public bool InPlaceUpdater(ref CacheKey key, ref CacheInput input, ref CacheValu
throw new NotImplementedException();
}

public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
{
throw new NotImplementedException();
}
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/ClassCacheMT/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public bool InPlaceUpdater(ref CacheKey key, ref CacheInput input, ref CacheValu
throw new NotImplementedException();
}

public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
{
throw new NotImplementedException();
}
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/ClassSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void ReadCompletionCallback(ref MyKey key, ref MyInput input, ref MyOutpu
public void UpsertCompletionCallback(ref MyKey key, ref MyValue value, MyContext ctx) { }
public void RMWCompletionCallback(ref MyKey key, ref MyInput input, MyContext ctx, Status status) { }
public void DeleteCompletionCallback(ref MyKey key, MyContext ctx) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint) { }
}

class Program
Expand Down
3 changes: 2 additions & 1 deletion cs/playground/FixedLenStructSample/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using FASTER.core;
using System;
using System.Collections.Generic;

namespace FixedLenStructSample
{
Expand All @@ -11,7 +12,7 @@ namespace FixedLenStructSample
/// </summary>
public class FixedLenFunctions : IFunctions<FixedLenKey, FixedLenValue, string, string, Empty>
{
public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
{
}

Expand Down
2 changes: 1 addition & 1 deletion cs/playground/PeriodicCompaction/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public bool InPlaceUpdater(ref CacheKey key, ref CacheInput input, ref CacheValu
throw new NotImplementedException();
}

public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
{
throw new NotImplementedException();
}
Expand Down
5 changes: 3 additions & 2 deletions cs/playground/StructSample/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using FASTER.core;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;

Expand Down Expand Up @@ -31,7 +32,7 @@ public void ReadCompletionCallback(ref long key, ref long input, ref long output
public void UpsertCompletionCallback(ref long key, ref long value, Empty ctx) { }
public void DeleteCompletionCallback(ref long key, Empty ctx) { }
public void RMWCompletionCallback(ref long key, ref long input, Empty ctx, Status s) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint) { }
}


Expand Down Expand Up @@ -71,6 +72,6 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp
public void UpsertCompletionCallback(ref Key key, ref Value output, Empty ctx) { }
public void DeleteCompletionCallback(ref Key key, Empty ctx) { }
public void RMWCompletionCallback(ref Key key, ref Input output, Empty ctx, Status status) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint) { }
}
}
5 changes: 3 additions & 2 deletions cs/playground/StructSampleCore/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using FASTER.core;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;

Expand Down Expand Up @@ -31,7 +32,7 @@ public void ReadCompletionCallback(ref long key, ref long input, ref long output
public void UpsertCompletionCallback(ref long key, ref long value, Empty ctx) { }
public void DeleteCompletionCallback(ref long key, Empty ctx) { }
public void RMWCompletionCallback(ref long key, ref long input, Empty ctx, Status s) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint) { }
}


Expand Down Expand Up @@ -71,6 +72,6 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp
public void UpsertCompletionCallback(ref Key key, ref Value output, Empty ctx) { }
public void DeleteCompletionCallback(ref Key key, Empty ctx) { }
public void RMWCompletionCallback(ref Key key, ref Input output, Empty ctx, Status status) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint) { }
}
}
4 changes: 2 additions & 2 deletions cs/playground/SumStore/RecoveryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void RunThread(int threadId, bool continueSession)
// Register with thread
do
{
sno = fht.ContinueSession(sessionGuid);
sno = fht.ContinueSession(sessionGuid).UntilSerialNo;
} while (sno == -1);
Console.WriteLine("Session {0} recovered until {1}", sessionGuid, sno);
sno++;
Expand Down Expand Up @@ -195,7 +195,7 @@ private void Test()
// Register with thread
long _sno = -1;
do {
_sno = fht.ContinueSession(sessionGuid);
_sno = fht.ContinueSession(sessionGuid).UntilSerialNo;
} while (_sno == -1);
sno.Add(_sno);

Expand Down
4 changes: 2 additions & 2 deletions cs/playground/SumStore/SumStoreTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public void DeleteCompletionCallback(ref AdId key, Empty ctx)
{
}

public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
{
Console.WriteLine("Session {0} reports persistence until {1}", sessionId, serialNum);
Console.WriteLine("Session {0} reports persistence until {1}", sessionId, commitPoint.UntilSerialNo);
}

// Read functions
Expand Down
5 changes: 3 additions & 2 deletions cs/playground/VarLenStructSample/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using FASTER.core;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;

Expand Down Expand Up @@ -43,9 +44,9 @@ public void DeleteCompletionCallback(ref VarLenType key, Empty ctx)
{
}

public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(Guid sessionId, CommitPoint commitPoint)
{
Debug.WriteLine("Session {0} reports persistence until {1}", sessionId, serialNum);
Debug.WriteLine("Session {0} reports persistence until {1}", sessionId, commitPoint.UntilSerialNo);
}

// Read functions
Expand Down
20 changes: 12 additions & 8 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ public partial class FasterKV<Key, Value, Input, Output, Context, Functions> : F
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
/// <summary>
/// Whether we are in relaxed CPR mode, where IO pending ops are not
/// part of the CPR checkpoint
/// </summary>
public bool RelaxedCPR = false;

/// <summary>
/// Complete outstanding pending operations
/// </summary>
Expand Down Expand Up @@ -119,7 +113,12 @@ private bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext
if (toCtx.version < version)
{
CopyContext(fromCtx, toCtx);
_hybridLogCheckpoint.info.checkpointTokens.TryAdd(toCtx.guid, toCtx.serialNum);
_hybridLogCheckpoint.info.checkpointTokens.TryAdd(toCtx.guid,
new CommitPoint
{
UntilSerialNo = toCtx.serialNum,
ExcludedSerialNos = toCtx.excludedSerialNos
});
return true;
}
}
Expand Down Expand Up @@ -336,7 +335,12 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
if (!prevThreadCtx.Value.markers[EpochPhaseIdx.CheckpointCompletionCallback])
{
// Thread local action
functions.CheckpointCompletionCallback(threadCtx.Value.guid, prevThreadCtx.Value.serialNum);
functions.CheckpointCompletionCallback(threadCtx.Value.guid,
new CommitPoint
{
UntilSerialNo = prevThreadCtx.Value.serialNum,
ExcludedSerialNos = prevThreadCtx.Value.excludedSerialNos
});
prevThreadCtx.Value.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true;
}
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.Value.version))
Expand Down
12 changes: 6 additions & 6 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functio
public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientSession()
{
// We have to use relaxed CPR with async client sessions
this.RelaxedCPR = true;
UseRelaxedCPR();

Guid guid = Guid.NewGuid();
var ctx = new FasterExecutionContext();
Expand All @@ -46,15 +46,15 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientS
/// Continue session with FASTER
/// </summary>
/// <param name="guid"></param>
/// <param name="lsn"></param>
/// <param name="cp"></param>
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> ContinueClientSession(Guid guid, out long lsn)
public ClientSession<Key, Value, Input, Output, Context, Functions> ContinueClientSession(Guid guid, out CommitPoint cp)
{
// We have to use relaxed CPR with async client sessions
this.RelaxedCPR = true;
UseRelaxedCPR();

lsn = InternalContinue(guid);
if (lsn == -1)
cp = InternalContinue(guid);
if (cp.UntilSerialNo == -1)
throw new Exception($"Unable to find session {guid} to recover");

var prevCtx = this.prevThreadCtx.Value;
Expand Down
50 changes: 41 additions & 9 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,27 @@ internal class FasterExecutionContext : SerializedFasterExecutionContext
public Queue<PendingContext> retryRequests;
public Dictionary<long, PendingContext> ioPendingRequests;
public AsyncQueue<AsyncIOContext<Key, Value>> readyResponses;
public List<long> excludedSerialNos;
}
}


/// <summary>
/// Descriptor for a CPR commit point
/// </summary>
public struct CommitPoint
{
/// <summary>
/// Serial number until which we have committed
/// </summary>
public long UntilSerialNo;

/// <summary>
/// List of operation serial nos excluded from commit
/// </summary>
public List<long> ExcludedSerialNos;
}

/// <summary>
/// Recovery info for hybrid log
/// </summary>
Expand Down Expand Up @@ -154,14 +171,14 @@ public struct HybridLogRecoveryInfo
public Guid[] guids;

/// <summary>
/// Tokens per guid restored during Continue
/// Commit tokens per guid restored during Continue
/// </summary>
public ConcurrentDictionary<Guid, long> continueTokens;
public ConcurrentDictionary<Guid, CommitPoint> continueTokens;

/// <summary>
/// Tokens per guid created during Checkpoint
/// Commit tokens per guid created during Checkpoint
/// </summary>
public ConcurrentDictionary<Guid, long> checkpointTokens;
public ConcurrentDictionary<Guid, CommitPoint> checkpointTokens;

/// <summary>
/// Object log segment offsets
Expand All @@ -184,8 +201,10 @@ public void Initialize(Guid token, int _version)
finalLogicalAddress = 0;
headAddress = 0;
guids = new Guid[LightEpoch.kTableSize + 1];
continueTokens = new ConcurrentDictionary<Guid, long>();
checkpointTokens = new ConcurrentDictionary<Guid, long>();

continueTokens = new ConcurrentDictionary<Guid, CommitPoint>();
checkpointTokens = new ConcurrentDictionary<Guid, CommitPoint>();

objectLogSegmentOffsets = null;
}

Expand All @@ -196,7 +215,7 @@ public void Initialize(Guid token, int _version)
public void Initialize(StreamReader reader)
{
guids = new Guid[LightEpoch.kTableSize + 1];
continueTokens = new ConcurrentDictionary<Guid, long>();
continueTokens = new ConcurrentDictionary<Guid, CommitPoint>();

string value = reader.ReadLine();
guid = Guid.Parse(value);
Expand Down Expand Up @@ -231,7 +250,17 @@ public void Initialize(StreamReader reader)
guids[i] = Guid.Parse(value);
value = reader.ReadLine();
var serialno = long.Parse(value);
continueTokens.TryAdd(guids[i], serialno);

var exclusions = new List<long>();
var exclusionCount = int.Parse(reader.ReadLine());
for (int j = 0; j < exclusionCount; j++)
exclusions.Add(long.Parse(reader.ReadLine()));

continueTokens.TryAdd(guids[i], new CommitPoint
{
UntilSerialNo = serialno,
ExcludedSerialNos = exclusions
});
}

// Read object log segment offsets
Expand Down Expand Up @@ -294,7 +323,10 @@ public byte[] ToByteArray()
foreach (var kvp in checkpointTokens)
{
writer.WriteLine(kvp.Key);
writer.WriteLine(kvp.Value);
writer.WriteLine(kvp.Value.UntilSerialNo);
writer.WriteLine(kvp.Value.ExcludedSerialNos.Count);
foreach (long item in kvp.Value.ExcludedSerialNos)
writer.WriteLine(item);
}

// Write object log segment offsets
Expand Down
14 changes: 11 additions & 3 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ public partial class FasterKV<Key, Value, Input, Output, Context, Functions> : F
private readonly bool CopyReadsToTail = false;
private readonly bool FoldOverSnapshot = false;
private readonly int sectorSize;

private readonly bool WriteDefaultOnDelete = false;
private bool RelaxedCPR = false;

/// <summary>
/// Use relaxed version of CPR, where ops pending I/O
/// are not part of CPR checkpoint. This mode allows
/// us to eliminate the WAIT_PENDING phase, and allows
/// sessions to be suspended. Do not modify during checkpointing.
/// </summary>
public void UseRelaxedCPR() => RelaxedCPR = true;

/// <summary>
/// Number of used entries in hash index
Expand Down Expand Up @@ -68,7 +76,7 @@ private enum CheckpointType
private HybridLogCheckpointInfo _hybridLogCheckpoint;


private ConcurrentDictionary<Guid, long> _recoveredSessions;
private ConcurrentDictionary<Guid, CommitPoint> _recoveredSessions;

private readonly FastThreadLocal<FasterExecutionContext> prevThreadCtx;
private readonly FastThreadLocal<FasterExecutionContext> threadCtx;
Expand Down Expand Up @@ -292,7 +300,7 @@ public Guid StartSession()
/// </summary>
/// <param name="guid"></param>
/// <returns></returns>
public long ContinueSession(Guid guid)
public CommitPoint ContinueSession(Guid guid)
{
return InternalContinue(guid);
}
Expand Down
Loading

0 comments on commit 9afc5fb

Please sign in to comment.