diff --git a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs index df9271a6e..de0b0f9d8 100644 --- a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs +++ b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs @@ -20,8 +20,15 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa const byte logTokenCount = 1; const byte flogCommitCount = 1; - private readonly INamedDeviceFactory deviceFactory; - private readonly ICheckpointNamingScheme checkpointNamingScheme; + /// + /// deviceFactory + /// + protected readonly INamedDeviceFactory deviceFactory; + + /// + /// checkpointNamingScheme + /// + protected readonly ICheckpointNamingScheme checkpointNamingScheme; private readonly SemaphoreSlim semaphore; private readonly bool removeOutdated; @@ -161,6 +168,7 @@ public byte[] GetCommitMetadata(long commitNum) #endregion #region ICheckpointManager + /// public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata) { @@ -209,7 +217,7 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken) } /// - public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) + public virtual unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) { var device = NextLogCheckpointDevice(logToken); @@ -233,7 +241,7 @@ public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) } /// - public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog) + public virtual unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog) { deltaLog.Allocate(out int length, out long physicalAddress); if (length < commitMetadata.Length) @@ -261,7 +269,7 @@ public IEnumerable GetLogCheckpointTokens() } /// - public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo) + public virtual byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo) { byte[] metadata = null; if (deltaLog != null && scanDelta) @@ -419,7 +427,7 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context) /// /// /// - private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size) + protected unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size) { if (bufferPool == null) bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize); @@ -445,7 +453,7 @@ private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, i /// /// /// - private unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size) + protected unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size) { if (bufferPool == null) bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize); diff --git a/cs/src/core/Index/Recovery/IndexRecovery.cs b/cs/src/core/Index/Recovery/IndexRecovery.cs index 8085b20e8..890d7c9e1 100644 --- a/cs/src/core/Index/Recovery/IndexRecovery.cs +++ b/cs/src/core/Index/Recovery/IndexRecovery.cs @@ -16,6 +16,11 @@ public partial class FasterBase internal ICheckpointManager checkpointManager; internal bool disposeCheckpointManager; + /// + /// CheckpointManager + /// + public ICheckpointManager CheckpointManager => checkpointManager; + // Derived class exposed API internal void RecoverFuzzyIndex(IndexCheckpointInfo info) { diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index f7b240233..697dba2d7 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -137,17 +137,69 @@ internal RecoveryOptions(bool clearLocks, long headAddress, long tailAddress, bo public partial class FasterKV : FasterBase, IFasterKV { - private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo, - out IndexCheckpointInfo recoveredICInfo) + /// + /// GetLatestCheckpointTokens + /// + /// + /// + public void GetLatestCheckpointTokens(out Guid hlogToken, out Guid indexToken) { + GetClosestHybridLogCheckpointInfo(-1, out hlogToken, out var recoveredHlcInfo, out var _); + GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out indexToken, out var _); + } - logger?.LogInformation("********* Primary Recovery Information ********"); + /// + /// Get HLog latest version + /// + /// + public long GetLatestCheckpointVersion() + { + GetClosestHybridLogCheckpointInfo(-1, out var hlogToken, out var _, out var _); + if (hlogToken == default) + return -1; + HybridLogCheckpointInfo current = new(); + current.Recover(hlogToken, checkpointManager, hlog.LogPageSizeBits, + out var _, false); + return current.info.nextVersion; + } + + /// + /// Get size of snapshot files for token + /// + /// + /// + public long GetSnapshotFileSizes(Guid token) + { + HybridLogCheckpointInfo current = new(); + current.Recover(token, checkpointManager, hlog.LogPageSizeBits, + out var _, false); + return current.info.finalLogicalAddress; + } + + /// + /// Get size of index file for token + /// + /// + /// + public long GetIndexFileSize(Guid token) + { + IndexCheckpointInfo recoveredICInfo = new IndexCheckpointInfo(); + recoveredICInfo.Recover(token, checkpointManager); + return (long)recoveredICInfo.info.num_ht_bytes; + } - HybridLogCheckpointInfo current, closest = default; - Guid closestToken = default; + private void GetClosestHybridLogCheckpointInfo( + long requestedVersion, + out Guid closestToken, + out HybridLogCheckpointInfo closest, + out byte[] cookie) + { + HybridLogCheckpointInfo current; long closestVersion = long.MaxValue; - byte[] cookie = default; - + closest = default; + closestToken = default; + cookie = default; + // Traverse through all current tokens to find either the largest version or the version that's closest to // but smaller than the requested version. Need to iterate through all unpruned versions because file system // is not guaranteed to return tokens in order of freshness. @@ -156,7 +208,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo try { current = new HybridLogCheckpointInfo(); - current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits, + current.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits, out var currCookie, false); var distanceToTarget = (requestedVersion == -1 ? long.MaxValue : requestedVersion) - current.info.version; // This is larger than intended version, cannot recover to this. @@ -171,7 +223,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo cookie = currCookie; break; } - + // Otherwise, write it down and wait to see if there's a closer one; if (distanceToTarget < closestVersion) { @@ -193,20 +245,11 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo logger?.LogInformation("HybridLog Checkpoint: {hybridLogToken}", hybridLogToken); } + } - recoveredHlcInfo = closest; - recoveredCommitCookie = cookie; - if (recoveredHlcInfo.IsDefault()) - throw new FasterException("Unable to find valid HybridLog token"); - - if (recoveredHlcInfo.deltaLog != null) - { - recoveredHlcInfo.Dispose(); - // need to actually scan delta log now - recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true); - } - recoveredHlcInfo.info.DebugPrint(logger); - + private void GetClosestIndexCheckpointInfo(ref HybridLogCheckpointInfo recoveredHlcInfo, out Guid closestToken, out IndexCheckpointInfo recoveredICInfo) + { + closestToken = default; recoveredICInfo = default; foreach (var indexToken in checkpointManager.GetIndexCheckpointTokens()) { @@ -229,8 +272,30 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo logger?.LogInformation("Index Checkpoint: {indexToken}", indexToken); recoveredICInfo.info.DebugPrint(logger); + closestToken = indexToken; break; } + } + + private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo recoveredHlcInfo, + out IndexCheckpointInfo recoveredICInfo) + { + logger?.LogInformation("********* Primary Recovery Information ********"); + + GetClosestHybridLogCheckpointInfo(requestedVersion, out var closestToken, out recoveredHlcInfo, out recoveredCommitCookie); + + if (recoveredHlcInfo.IsDefault()) + throw new FasterException("Unable to find valid HybridLog token"); + + if (recoveredHlcInfo.deltaLog != null) + { + recoveredHlcInfo.Dispose(); + // need to actually scan delta log now + recoveredHlcInfo.Recover(closestToken, checkpointManager, hlog.LogPageSizeBits, out _, true); + } + recoveredHlcInfo.info.DebugPrint(logger); + + GetClosestIndexCheckpointInfo(ref recoveredHlcInfo, out _, out recoveredICInfo); if (recoveredICInfo.IsDefault()) { @@ -383,7 +448,7 @@ private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckp hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress); _recoveredSessions = recoveredHLCInfo.info.continueTokens; _recoveredSessionNameMap = recoveredHLCInfo.info.sessionNameMap; - maxSessionID = recoveredHLCInfo.info.maxSessionID; + maxSessionID = Math.Max(recoveredHLCInfo.info.maxSessionID, maxSessionID); checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid); recoveredHLCInfo.Dispose(); }