From 0395aa2e70ff71f2d906721bb8027256b8208936 Mon Sep 17 00:00:00 2001
From: vazois <96085550+vazois@users.noreply.github.com>
Date: Mon, 17 Oct 2022 17:26:47 -0700
Subject: [PATCH] retrieve latest ckpt tokens and file sizes; access checkpoint
manager (#756)
---
.../DeviceLogCommitCheckpointManager.cs | 22 ++--
cs/src/core/Index/Recovery/IndexRecovery.cs | 5 +
cs/src/core/Index/Recovery/Recovery.cs | 111 ++++++++++++++----
3 files changed, 108 insertions(+), 30 deletions(-)
diff --git a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
index df9271a6e..de0b0f9d8 100644
--- a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
+++ b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
@@ -20,8 +20,15 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
const byte logTokenCount = 1;
const byte flogCommitCount = 1;
- private readonly INamedDeviceFactory deviceFactory;
- private readonly ICheckpointNamingScheme checkpointNamingScheme;
+ ///
+ /// deviceFactory
+ ///
+ protected readonly INamedDeviceFactory deviceFactory;
+
+ ///
+ /// checkpointNamingScheme
+ ///
+ protected readonly ICheckpointNamingScheme checkpointNamingScheme;
private readonly SemaphoreSlim semaphore;
private readonly bool removeOutdated;
@@ -161,6 +168,7 @@ public byte[] GetCommitMetadata(long commitNum)
#endregion
#region ICheckpointManager
+
///
public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
{
@@ -209,7 +217,7 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken)
}
///
- public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
+ public virtual unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
{
var device = NextLogCheckpointDevice(logToken);
@@ -233,7 +241,7 @@ public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
}
///
- public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog)
+ public virtual unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog)
{
deltaLog.Allocate(out int length, out long physicalAddress);
if (length < commitMetadata.Length)
@@ -261,7 +269,7 @@ public IEnumerable GetLogCheckpointTokens()
}
///
- public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
+ public virtual byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
{
byte[] metadata = null;
if (deltaLog != null && scanDelta)
@@ -419,7 +427,7 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context)
///
///
///
- private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size)
+ protected unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
@@ -445,7 +453,7 @@ private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, i
///
///
///
- private unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size)
+ protected unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
diff --git a/cs/src/core/Index/Recovery/IndexRecovery.cs b/cs/src/core/Index/Recovery/IndexRecovery.cs
index 8085b20e8..890d7c9e1 100644
--- a/cs/src/core/Index/Recovery/IndexRecovery.cs
+++ b/cs/src/core/Index/Recovery/IndexRecovery.cs
@@ -16,6 +16,11 @@ public partial class FasterBase
internal ICheckpointManager checkpointManager;
internal bool disposeCheckpointManager;
+ ///
+ /// CheckpointManager
+ ///
+ public ICheckpointManager CheckpointManager => checkpointManager;
+
// Derived class exposed API
internal void RecoverFuzzyIndex(IndexCheckpointInfo info)
{
diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs
index f7b240233..697dba2d7 100644
--- a/cs/src/core/Index/Recovery/Recovery.cs
+++ b/cs/src/core/Index/Recovery/Recovery.cs
@@ -137,17 +137,69 @@ internal RecoveryOptions(bool clearLocks, long headAddress, long tailAddress, bo
public partial class FasterKV : FasterBase, IFasterKV
{
- private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo,
- out IndexCheckpointInfo recoveredICInfo)
+ ///
+ /// GetLatestCheckpointTokens
+ ///
+ ///
+ ///
+ public void GetLatestCheckpointTokens(out Guid hlogToken, out Guid indexToken)
{
+ GetClosestHybridLogCheckpointInfo(-1, out hlogToken, out var recoveredHlcInfo, out var _);
+ GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out indexToken, out var _);
+ }
- logger?.LogInformation("********* Primary Recovery Information ********");
+ ///
+ /// Get HLog latest version
+ ///
+ ///
+ public long GetLatestCheckpointVersion()
+ {
+ GetClosestHybridLogCheckpointInfo(-1, out var hlogToken, out var _, out var _);
+ if (hlogToken == default)
+ return -1;
+ HybridLogCheckpointInfo current = new();
+ current.Recover(hlogToken, checkpointManager, hlog.LogPageSizeBits,
+ out var _, false);
+ return current.info.nextVersion;
+ }
+
+ ///
+ /// Get size of snapshot files for token
+ ///
+ ///
+ ///
+ public long GetSnapshotFileSizes(Guid token)
+ {
+ HybridLogCheckpointInfo current = new();
+ current.Recover(token, checkpointManager, hlog.LogPageSizeBits,
+ out var _, false);
+ return current.info.finalLogicalAddress;
+ }
+
+ ///
+ /// Get size of index file for token
+ ///
+ ///
+ ///
+ public long GetIndexFileSize(Guid token)
+ {
+ IndexCheckpointInfo recoveredICInfo = new IndexCheckpointInfo();
+ recoveredICInfo.Recover(token, checkpointManager);
+ return (long)recoveredICInfo.info.num_ht_bytes;
+ }
- HybridLogCheckpointInfo current, closest = default;
- Guid closestToken = default;
+ private void GetClosestHybridLogCheckpointInfo(
+ long requestedVersion,
+ out Guid closestToken,
+ out HybridLogCheckpointInfo closest,
+ out byte[] cookie)
+ {
+ HybridLogCheckpointInfo current;
long closestVersion = long.MaxValue;
- byte[] cookie = default;
-
+ closest = default;
+ closestToken = default;
+ cookie = default;
+
// Traverse through all current tokens to find either the largest version or the version that's closest to
// but smaller than the requested version. Need to iterate through all unpruned versions because file system
// is not guaranteed to return tokens in order of freshness.
@@ -156,7 +208,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
try
{
current = new HybridLogCheckpointInfo();
- current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits,
+ current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits,
out var currCookie, false);
var distanceToTarget = (requestedVersion == -1 ? long.MaxValue : requestedVersion) - current.info.version;
// This is larger than intended version, cannot recover to this.
@@ -171,7 +223,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
cookie = currCookie;
break;
}
-
+
// Otherwise, write it down and wait to see if there's a closer one;
if (distanceToTarget < closestVersion)
{
@@ -193,20 +245,11 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
logger?.LogInformation("HybridLog Checkpoint: {hybridLogToken}", hybridLogToken);
}
+ }
- recoveredHlcInfo = closest;
- recoveredCommitCookie = cookie;
- if (recoveredHlcInfo.IsDefault())
- throw new FasterException("Unable to find valid HybridLog token");
-
- if (recoveredHlcInfo.deltaLog != null)
- {
- recoveredHlcInfo.Dispose();
- // need to actually scan delta log now
- recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true);
- }
- recoveredHlcInfo.info.DebugPrint(logger);
-
+ private void GetClosestIndexCheckpointInfo(ref HybridLogCheckpointInfo recoveredHlcInfo, out Guid closestToken, out IndexCheckpointInfo recoveredICInfo)
+ {
+ closestToken = default;
recoveredICInfo = default;
foreach (var indexToken in checkpointManager.GetIndexCheckpointTokens())
{
@@ -229,8 +272,30 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo
logger?.LogInformation("Index Checkpoint: {indexToken}", indexToken);
recoveredICInfo.info.DebugPrint(logger);
+ closestToken = indexToken;
break;
}
+ }
+
+ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo,
+ out IndexCheckpointInfo recoveredICInfo)
+ {
+ logger?.LogInformation("********* Primary Recovery Information ********");
+
+ GetClosestHybridLogCheckpointInfo(requestedVersion, out var closestToken, out recoveredHlcInfo, out recoveredCommitCookie);
+
+ if (recoveredHlcInfo.IsDefault())
+ throw new FasterException("Unable to find valid HybridLog token");
+
+ if (recoveredHlcInfo.deltaLog != null)
+ {
+ recoveredHlcInfo.Dispose();
+ // need to actually scan delta log now
+ recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true);
+ }
+ recoveredHlcInfo.info.DebugPrint(logger);
+
+ GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out _, out recoveredICInfo);
if (recoveredICInfo.IsDefault())
{
@@ -383,7 +448,7 @@ private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckp
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;
_recoveredSessionNameMap = recoveredHLCInfo.info.sessionNameMap;
- maxSessionID = recoveredHLCInfo.info.maxSessionID;
+ maxSessionID = Math.Max(recoveredHLCInfo.info.maxSessionID, maxSessionID);
checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
recoveredHLCInfo.Dispose();
}