Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint Retrieval and Management from User Space #756

Merged
merged 1 commit into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
const byte logTokenCount = 1;
const byte flogCommitCount = 1;

private readonly INamedDeviceFactory deviceFactory;
private readonly ICheckpointNamingScheme checkpointNamingScheme;
/// <summary>
/// deviceFactory
/// </summary>
protected readonly INamedDeviceFactory deviceFactory;

/// <summary>
/// checkpointNamingScheme
/// </summary>
protected readonly ICheckpointNamingScheme checkpointNamingScheme;
private readonly SemaphoreSlim semaphore;

private readonly bool removeOutdated;
Expand Down Expand Up @@ -161,6 +168,7 @@ public byte[] GetCommitMetadata(long commitNum)
#endregion

#region ICheckpointManager

/// <inheritdoc />
public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
{
Expand Down Expand Up @@ -209,7 +217,7 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken)
}

/// <inheritdoc />
public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
public virtual unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
{
var device = NextLogCheckpointDevice(logToken);

Expand All @@ -233,7 +241,7 @@ public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
}

/// <inheritdoc />
public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog)
public virtual unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog)
{
deltaLog.Allocate(out int length, out long physicalAddress);
if (length < commitMetadata.Length)
Expand Down Expand Up @@ -261,7 +269,7 @@ public IEnumerable<Guid> GetLogCheckpointTokens()
}

/// <inheritdoc />
public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
public virtual byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
{
byte[] metadata = null;
if (deltaLog != null && scanDelta)
Expand Down Expand Up @@ -419,7 +427,7 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context)
/// <param name="address"></param>
/// <param name="buffer"></param>
/// <param name="size"></param>
private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size)
protected unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
Expand All @@ -445,7 +453,7 @@ private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, i
/// <param name="address"></param>
/// <param name="buffer"></param>
/// <param name="size"></param>
private unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size)
protected unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Index/Recovery/IndexRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public partial class FasterBase
internal ICheckpointManager checkpointManager;
internal bool disposeCheckpointManager;

/// <summary>
/// CheckpointManager
/// </summary>
public ICheckpointManager CheckpointManager => checkpointManager;

// Derived class exposed API
internal void RecoverFuzzyIndex(IndexCheckpointInfo info)
{
Expand Down
111 changes: 88 additions & 23 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,69 @@ internal RecoveryOptions(bool clearLocks, long headAddress, long tailAddress, bo

public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo,
out IndexCheckpointInfo recoveredICInfo)
/// <summary>
/// GetLatestCheckpointTokens
/// </summary>
/// <param name="hlogToken"></param>
/// <param name="indexToken"></param>
public void GetLatestCheckpointTokens(out Guid hlogToken, out Guid indexToken)
{
GetClosestHybridLogCheckpointInfo(-1, out hlogToken, out var recoveredHlcInfo, out var _);
GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out indexToken, out var _);
}

logger?.LogInformation("********* Primary Recovery Information ********");
/// <summary>
/// Get HLog latest version
/// </summary>
/// <returns></returns>
public long GetLatestCheckpointVersion()
{
GetClosestHybridLogCheckpointInfo(-1, out var hlogToken, out var _, out var _);
if (hlogToken == default)
return -1;
HybridLogCheckpointInfo current = new();
current.Recover(hlogToken, checkpointManager, hlog.LogPageSizeBits,
out var _, false);
return current.info.nextVersion;
}

/// <summary>
/// Get size of snapshot files for token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public long GetSnapshotFileSizes(Guid token)
{
HybridLogCheckpointInfo current = new();
current.Recover(token, checkpointManager, hlog.LogPageSizeBits,
out var _, false);
return current.info.finalLogicalAddress;
}

/// <summary>
/// Get size of index file for token
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public long GetIndexFileSize(Guid token)
{
IndexCheckpointInfo recoveredICInfo = new IndexCheckpointInfo();
recoveredICInfo.Recover(token, checkpointManager);
return (long)recoveredICInfo.info.num_ht_bytes;
}

HybridLogCheckpointInfo current, closest = default;
Guid closestToken = default;
private void GetClosestHybridLogCheckpointInfo(
long requestedVersion,
out Guid closestToken,
out HybridLogCheckpointInfo closest,
out byte[] cookie)
{
HybridLogCheckpointInfo current;
long closestVersion = long.MaxValue;
byte[] cookie = default;

closest = default;
closestToken = default;
cookie = default;

// Traverse through all current tokens to find either the largest version or the version that's closest to
// but smaller than the requested version. Need to iterate through all unpruned versions because file system
// is not guaranteed to return tokens in order of freshness.
Expand All @@ -156,7 +208,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
try
{
current = new HybridLogCheckpointInfo();
current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits,
current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits,
out var currCookie, false);
var distanceToTarget = (requestedVersion == -1 ? long.MaxValue : requestedVersion) - current.info.version;
// This is larger than intended version, cannot recover to this.
Expand All @@ -171,7 +223,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
cookie = currCookie;
break;
}

// Otherwise, write it down and wait to see if there's a closer one;
if (distanceToTarget < closestVersion)
{
Expand All @@ -193,20 +245,11 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo

logger?.LogInformation("HybridLog Checkpoint: {hybridLogToken}", hybridLogToken);
}
}

recoveredHlcInfo = closest;
recoveredCommitCookie = cookie;
if (recoveredHlcInfo.IsDefault())
throw new FasterException("Unable to find valid HybridLog token");

if (recoveredHlcInfo.deltaLog != null)
{
recoveredHlcInfo.Dispose();
// need to actually scan delta log now
recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true);
}
recoveredHlcInfo.info.DebugPrint(logger);

private void GetClosestIndexCheckpointInfo(ref HybridLogCheckpointInfo recoveredHlcInfo, out Guid closestToken, out IndexCheckpointInfo recoveredICInfo)
{
closestToken = default;
recoveredICInfo = default;
foreach (var indexToken in checkpointManager.GetIndexCheckpointTokens())
{
Expand All @@ -229,8 +272,30 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo

logger?.LogInformation("Index Checkpoint: {indexToken}", indexToken);
recoveredICInfo.info.DebugPrint(logger);
closestToken = indexToken;
break;
}
}

private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo,
out IndexCheckpointInfo recoveredICInfo)
{
logger?.LogInformation("********* Primary Recovery Information ********");

GetClosestHybridLogCheckpointInfo(requestedVersion, out var closestToken, out recoveredHlcInfo, out recoveredCommitCookie);

if (recoveredHlcInfo.IsDefault())
throw new FasterException("Unable to find valid HybridLog token");

if (recoveredHlcInfo.deltaLog != null)
{
recoveredHlcInfo.Dispose();
// need to actually scan delta log now
recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true);
}
recoveredHlcInfo.info.DebugPrint(logger);

GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out _, out recoveredICInfo);

if (recoveredICInfo.IsDefault())
{
Expand Down Expand Up @@ -383,7 +448,7 @@ private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckp
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;
_recoveredSessionNameMap = recoveredHLCInfo.info.sessionNameMap;
maxSessionID = recoveredHLCInfo.info.maxSessionID;
maxSessionID = Math.Max(recoveredHLCInfo.info.maxSessionID, maxSessionID);
checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
recoveredHLCInfo.Dispose();
}
Expand Down