Skip to content

Commit

Permalink
retrieve latest ckpt tokens and file sizes; access checkpoint manager (
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois authored and badrishc committed Nov 10, 2022
1 parent d22d6fc commit 0395aa2
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
const byte logTokenCount = 1;
const byte flogCommitCount = 1;

private readonly INamedDeviceFactory deviceFactory;
private readonly ICheckpointNamingScheme checkpointNamingScheme;
/// <summary>
/// deviceFactory
/// </summary>
protected readonly INamedDeviceFactory deviceFactory;

/// <summary>
/// checkpointNamingScheme
/// </summary>
protected readonly ICheckpointNamingScheme checkpointNamingScheme;
private readonly SemaphoreSlim semaphore;

private readonly bool removeOutdated;
Expand Down Expand Up @@ -161,6 +168,7 @@ public byte[] GetCommitMetadata(long commitNum)
#endregion

#region ICheckpointManager

/// <inheritdoc />
public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
{
Expand Down Expand Up @@ -209,7 +217,7 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken)
}

/// <inheritdoc />
public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
public virtual unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
{
var device = NextLogCheckpointDevice(logToken);

Expand All @@ -233,7 +241,7 @@ public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
}

/// <inheritdoc />
public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog)
public virtual unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog)
{
deltaLog.Allocate(out int length, out long physicalAddress);
if (length < commitMetadata.Length)
Expand Down Expand Up @@ -261,7 +269,7 @@ public IEnumerable<Guid> GetLogCheckpointTokens()
}

/// <inheritdoc />
public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
public virtual byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
{
byte[] metadata = null;
if (deltaLog != null && scanDelta)
Expand Down Expand Up @@ -419,7 +427,7 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context)
/// <param name="address"></param>
/// <param name="buffer"></param>
/// <param name="size"></param>
private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size)
protected unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
Expand All @@ -445,7 +453,7 @@ private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, i
/// <param name="address"></param>
/// <param name="buffer"></param>
/// <param name="size"></param>
private unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size)
protected unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Index/Recovery/IndexRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public partial class FasterBase
internal ICheckpointManager checkpointManager;
internal bool disposeCheckpointManager;

/// <summary>
/// CheckpointManager
/// </summary>
public ICheckpointManager CheckpointManager => checkpointManager;

// Derived class exposed API
internal void RecoverFuzzyIndex(IndexCheckpointInfo info)
{
Expand Down
111 changes: 88 additions & 23 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,69 @@ internal RecoveryOptions(bool clearLocks, long headAddress, long tailAddress, bo

public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo,
out IndexCheckpointInfo recoveredICInfo)
/// <summary>
/// GetLatestCheckpointTokens
/// </summary>
/// <param name="hlogToken"></param>
/// <param name="indexToken"></param>
public void GetLatestCheckpointTokens(out Guid hlogToken, out Guid indexToken)
{
GetClosestHybridLogCheckpointInfo(-1, out hlogToken, out var recoveredHlcInfo, out var _);
GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out indexToken, out var _);
}

logger?.LogInformation("********* Primary Recovery Information ********");
/// <summary>
/// Get HLog latest version
/// </summary>
/// <returns></returns>
public long GetLatestCheckpointVersion()
{
GetClosestHybridLogCheckpointInfo(-1, out var hlogToken, out var _, out var _);
if (hlogToken == default)
return -1;
HybridLogCheckpointInfo current = new();
current.Recover(hlogToken, checkpointManager, hlog.LogPageSizeBits,
out var _, false);
return current.info.nextVersion;
}

/// <summary>
/// Get size of snapshot files for token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public long GetSnapshotFileSizes(Guid token)
{
HybridLogCheckpointInfo current = new();
current.Recover(token, checkpointManager, hlog.LogPageSizeBits,
out var _, false);
return current.info.finalLogicalAddress;
}

/// <summary>
/// Get size of index file for token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public long GetIndexFileSize(Guid token)
{
IndexCheckpointInfo recoveredICInfo = new IndexCheckpointInfo();
recoveredICInfo.Recover(token, checkpointManager);
return (long)recoveredICInfo.info.num_ht_bytes;
}

HybridLogCheckpointInfo current, closest = default;
Guid closestToken = default;
private void GetClosestHybridLogCheckpointInfo(
long requestedVersion,
out Guid closestToken,
out HybridLogCheckpointInfo closest,
out byte[] cookie)
{
HybridLogCheckpointInfo current;
long closestVersion = long.MaxValue;
byte[] cookie = default;

closest = default;
closestToken = default;
cookie = default;

// Traverse through all current tokens to find either the largest version or the version that's closest to
// but smaller than the requested version. Need to iterate through all unpruned versions because file system
// is not guaranteed to return tokens in order of freshness.
Expand All @@ -156,7 +208,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
try
{
current = new HybridLogCheckpointInfo();
current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits,
current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits,
out var currCookie, false);
var distanceToTarget = (requestedVersion == -1 ? long.MaxValue : requestedVersion) - current.info.version;
// This is larger than intended version, cannot recover to this.
Expand All @@ -171,7 +223,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
cookie = currCookie;
break;
}

// Otherwise, write it down and wait to see if there's a closer one;
if (distanceToTarget < closestVersion)
{
Expand All @@ -193,20 +245,11 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo

logger?.LogInformation("HybridLog Checkpoint: {hybridLogToken}", hybridLogToken);
}
}

recoveredHlcInfo = closest;
recoveredCommitCookie = cookie;
if (recoveredHlcInfo.IsDefault())
throw new FasterException("Unable to find valid HybridLog token");

if (recoveredHlcInfo.deltaLog != null)
{
recoveredHlcInfo.Dispose();
// need to actually scan delta log now
recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true);
}
recoveredHlcInfo.info.DebugPrint(logger);

private void GetClosestIndexCheckpointInfo(ref HybridLogCheckpointInfo recoveredHlcInfo, out Guid closestToken, out IndexCheckpointInfo recoveredICInfo)
{
closestToken = default;
recoveredICInfo = default;
foreach (var indexToken in checkpointManager.GetIndexCheckpointTokens())
{
Expand All @@ -229,8 +272,30 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo

logger?.LogInformation("Index Checkpoint: {indexToken}", indexToken);
recoveredICInfo.info.DebugPrint(logger);
closestToken = indexToken;
break;
}
}

private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo,
out IndexCheckpointInfo recoveredICInfo)
{
logger?.LogInformation("********* Primary Recovery Information ********");

GetClosestHybridLogCheckpointInfo(requestedVersion, out var closestToken, out recoveredHlcInfo, out recoveredCommitCookie);

if (recoveredHlcInfo.IsDefault())
throw new FasterException("Unable to find valid HybridLog token");

if (recoveredHlcInfo.deltaLog != null)
{
recoveredHlcInfo.Dispose();
// need to actually scan delta log now
recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true);
}
recoveredHlcInfo.info.DebugPrint(logger);

GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out _, out recoveredICInfo);

if (recoveredICInfo.IsDefault())
{
Expand Down Expand Up @@ -383,7 +448,7 @@ private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckp
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;
_recoveredSessionNameMap = recoveredHLCInfo.info.sessionNameMap;
maxSessionID = recoveredHLCInfo.info.maxSessionID;
maxSessionID = Math.Max(recoveredHLCInfo.info.maxSessionID, maxSessionID);
checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
recoveredHLCInfo.Dispose();
}
Expand Down

0 comments on commit 0395aa2

Please sign in to comment.