From c617bebb343747a4a51ec035b6e4bbf10644619b Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 25 Jul 2019 18:21:01 -0700 Subject: [PATCH] Make checkpointing pluggable (#161) We make FASTER C# checkpointing use a pluggable user-specified interface (ICheckpointManager) for providing devices such as for index and snapshot and for performing the atomic metadata commit. We refactor the current implementation (currently hard coded to use LocalStorageDevice and C# Streams) as a default and reference implementation. --- cs/src/core/Allocator/MallocFixedPageSize.cs | 10 +- .../core/Index/Common/CheckpointSettings.cs | 13 +- cs/src/core/Index/Common/Contexts.cs | 305 +++++------------- cs/src/core/Index/FASTER/FASTER.cs | 8 +- .../Index/{FASTER => Recovery}/Checkpoint.cs | 78 +---- .../Index/Recovery/DirectoryConfiguration.cs | 134 ++++++++ .../core/Index/Recovery/ICheckpointManager.cs | 111 +++++++ .../{FASTER => Recovery}/IndexCheckpoint.cs | 11 +- .../{FASTER => Recovery}/IndexRecovery.cs | 18 +- .../Index/Recovery/LocalCheckpointManager.cs | 206 ++++++++++++ .../Index/{FASTER => Recovery}/Recovery.cs | 80 +---- cs/test/FullRecoveryTests.cs | 2 +- cs/test/ObjectRecoveryTest.cs | 2 +- cs/test/SharedDirectoryTests.cs | 2 +- 14 files changed, 600 insertions(+), 380 deletions(-) rename cs/src/core/Index/{FASTER => Recovery}/Checkpoint.cs (91%) create mode 100644 cs/src/core/Index/Recovery/DirectoryConfiguration.cs create mode 100644 cs/src/core/Index/Recovery/ICheckpointManager.cs rename cs/src/core/Index/{FASTER => Recovery}/IndexCheckpoint.cs (89%) rename cs/src/core/Index/{FASTER => Recovery}/IndexRecovery.cs (87%) create mode 100644 cs/src/core/Index/Recovery/LocalCheckpointManager.cs rename cs/src/core/Index/{FASTER => Recovery}/Recovery.cs (83%) diff --git a/cs/src/core/Allocator/MallocFixedPageSize.cs b/cs/src/core/Allocator/MallocFixedPageSize.cs index d08beb2e5..846551c4f 100644 --- a/cs/src/core/Allocator/MallocFixedPageSize.cs +++ b/cs/src/core/Allocator/MallocFixedPageSize.cs @@ -470,10 +470,11 @@ public void Dispose() /// Public facing persistence API /// /// + /// /// - public void TakeCheckpoint(IDevice device, out ulong numBytes) + public void TakeCheckpoint(IDevice device, ulong start_offset, out ulong numBytes) { - BeginCheckpoint(device, 0UL, out numBytes); + BeginCheckpoint(device, start_offset, out numBytes); } /// @@ -563,9 +564,10 @@ public int GetPageSize() /// /// /// - public void Recover(IDevice device, int buckets, ulong numBytes) + /// + public void Recover(IDevice device, ulong offset, int buckets, ulong numBytes) { - BeginRecovery(device, 0UL, buckets, numBytes, out ulong numBytesRead); + BeginRecovery(device, offset, buckets, numBytes, out ulong numBytesRead); } /// diff --git a/cs/src/core/Index/Common/CheckpointSettings.cs b/cs/src/core/Index/Common/CheckpointSettings.cs index ba76f5016..6e20f6dbb 100644 --- a/cs/src/core/Index/Common/CheckpointSettings.cs +++ b/cs/src/core/Index/Common/CheckpointSettings.cs @@ -2,6 +2,8 @@ // Licensed under the MIT license. +using System; + namespace FASTER.core { /// @@ -27,13 +29,20 @@ public enum CheckpointType public class CheckpointSettings { /// - /// Directory where checkpoints are stored + /// Checkpoint manager /// - public string CheckpointDir = ""; + public ICheckpointManager CheckpointManager = null; /// /// Type of checkpoint /// public CheckpointType CheckPointType = CheckpointType.Snapshot; + + /// + /// Use specified directory for storing and retrieving checkpoints + /// This is a shortcut to providing the following: + /// CheckpointSettings.CheckpointManager = new LocalCheckpointManager(CheckpointDir) + /// + public string CheckpointDir = null; } } diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 16c289b0b..91910ea9f 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -106,139 +106,7 @@ internal class FasterExecutionContext : SerializedFasterExecutionContext } } - internal class DirectoryConfiguration - { - private readonly string checkpointDir; - public DirectoryConfiguration(string checkpointDir) - { - this.checkpointDir = checkpointDir; - } - - public const string index_base_folder = "index-checkpoints"; - public const string index_meta_file = "info"; - public const string hash_table_file = "ht"; - public const string overflow_buckets_file = "ofb"; - public const string snapshot_file = "snapshot"; - - public const string cpr_base_folder = "cpr-checkpoints"; - public const string cpr_meta_file = "info"; - - public void CreateIndexCheckpointFolder(Guid token) - { - var directory = GetIndexCheckpointFolder(token); - Directory.CreateDirectory(directory); - DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); - foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) - file.Delete(); - } - public void CreateHybridLogCheckpointFolder(Guid token) - { - var directory = GetHybridLogCheckpointFolder(token); - Directory.CreateDirectory(directory); - DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); - foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) - file.Delete(); - } - - public string GetIndexCheckpointFolder(Guid token = default(Guid)) - { - if (token != default(Guid)) - return GetMergedFolderPath(checkpointDir, index_base_folder, token.ToString()); - else - return GetMergedFolderPath(checkpointDir, index_base_folder); - } - - public string GetHybridLogCheckpointFolder(Guid token = default(Guid)) - { - if (token != default(Guid)) - return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString()); - else - return GetMergedFolderPath(checkpointDir, cpr_base_folder); - } - - public string GetIndexCheckpointMetaFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - index_base_folder, - token.ToString(), - index_meta_file, - ".dat"); - } - - public string GetPrimaryHashTableFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - index_base_folder, - token.ToString(), - hash_table_file, - ".dat"); - } - - public string GetOverflowBucketsFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - index_base_folder, - token.ToString(), - overflow_buckets_file, - ".dat"); - } - - public string GetHybridLogCheckpointMetaFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - token.ToString(), - cpr_meta_file, - ".dat"); - } - - public string GetHybridLogCheckpointContextFileName(Guid checkpointToken, Guid sessionToken) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - checkpointToken.ToString(), - sessionToken.ToString(), - ".dat"); - } - - public string GetHybridLogCheckpointFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - token.ToString(), - snapshot_file, - ".dat"); - } - - public string GetHybridLogObjectCheckpointFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - token.ToString(), - snapshot_file, - ".obj.dat"); - } - - public static string GetMergedFolderPath(params String[] paths) - { - String fullPath = paths[0]; - - for (int i = 1; i < paths.Length; i++) - { - if (i == paths.Length - 1 && paths[i].Contains(".")) - { - fullPath += paths[i]; - } - else - { - fullPath += Path.DirectorySeparatorChar + paths[i]; - } - } - - return fullPath; - } - } - + /// /// Recovery info for hybrid log /// @@ -280,10 +148,17 @@ public struct HybridLogRecoveryInfo /// Guid array /// public Guid[] guids; + + /// + /// Tokens per guid restored during Continue + /// + public ConcurrentDictionary continueTokens; + /// - /// Tokens per guid + /// Tokens per guid created during Checkpoint /// - public Dictionary continueTokens; + public ConcurrentDictionary checkpointTokens; + /// /// Object log segment offsets /// @@ -305,7 +180,8 @@ public void Initialize(Guid token, int _version) finalLogicalAddress = 0; headAddress = 0; guids = new Guid[LightEpoch.kTableSize + 1]; - continueTokens = new Dictionary(); + continueTokens = new ConcurrentDictionary(); + checkpointTokens = new ConcurrentDictionary(); objectLogSegmentOffsets = null; } @@ -316,7 +192,7 @@ public void Initialize(Guid token, int _version) public void Initialize(StreamReader reader) { guids = new Guid[LightEpoch.kTableSize + 1]; - continueTokens = new Dictionary(); + continueTokens = new ConcurrentDictionary(); string value = reader.ReadLine(); guid = Guid.Parse(value); @@ -346,6 +222,9 @@ public void Initialize(StreamReader reader) { value = reader.ReadLine(); guids[i] = Guid.Parse(value); + value = reader.ReadLine(); + var serialno = long.Parse(value); + continueTokens.TryAdd(guids[i], serialno); } // Read object log segment offsets @@ -362,51 +241,19 @@ public void Initialize(StreamReader reader) } } - /// - /// Recover info from token and checkpoint directory - /// - /// - /// - /// - public bool Recover(Guid token, string checkpointDir) - { - return Recover(token, new DirectoryConfiguration(checkpointDir)); - } - /// /// Recover info from token /// /// - /// + /// /// - internal bool Recover(Guid token, DirectoryConfiguration directoryConfiguration) + internal void Recover(Guid token, ICheckpointManager checkpointManager) { - string checkpointInfoFile = directoryConfiguration.GetHybridLogCheckpointMetaFileName(token); - using (var reader = new StreamReader(checkpointInfoFile)) - { - Initialize(reader); - } - - int num_threads = numThreads; - for (int i = 0; i < num_threads; i++) - { - var guid = guids[i]; - using (var reader = new StreamReader(directoryConfiguration.GetHybridLogCheckpointContextFileName(token, guid))) - { - var ctx = new SerializedFasterExecutionContext(); - ctx.Load(reader); - continueTokens.Add(ctx.guid, ctx.serialNum); - } - } + var metadata = checkpointManager.GetLogCommitMetadata(token); + if (metadata == null) + throw new Exception("Invalid log commit metadata for ID " + token.ToString()); - if (continueTokens.Count == num_threads) - { - return true; - } - else - { - return false; - } + Initialize(new StreamReader(new MemoryStream(metadata))); } /// @@ -418,32 +265,39 @@ public void Reset() } /// - /// Write info to file + /// Write info to byte array /// - /// - public void Write(StreamWriter writer) + public byte[] ToByteArray() { - writer.WriteLine(guid); - writer.WriteLine(useSnapshotFile); - writer.WriteLine(version); - writer.WriteLine(flushedLogicalAddress); - writer.WriteLine(startLogicalAddress); - writer.WriteLine(finalLogicalAddress); - writer.WriteLine(headAddress); - writer.WriteLine(numThreads); - for (int i = 0; i < numThreads; i++) + using (var ms = new MemoryStream()) { - writer.WriteLine(guids[i]); - } - - //Write object log segment offsets - writer.WriteLine(objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length); - if (objectLogSegmentOffsets != null) - { - for (int i = 0; i < objectLogSegmentOffsets.Length; i++) + using (StreamWriter writer = new StreamWriter(ms)) { - writer.WriteLine(objectLogSegmentOffsets[i]); + writer.WriteLine(guid); + writer.WriteLine(useSnapshotFile); + writer.WriteLine(version); + writer.WriteLine(flushedLogicalAddress); + writer.WriteLine(startLogicalAddress); + writer.WriteLine(finalLogicalAddress); + writer.WriteLine(headAddress); + writer.WriteLine(numThreads); + for (int i = 0; i < numThreads; i++) + { + writer.WriteLine(guids[i]); + writer.WriteLine(checkpointTokens[guids[i]]); + } + + // Write object log segment offsets + writer.WriteLine(objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length); + if (objectLogSegmentOffsets != null) + { + for (int i = 0; i < objectLogSegmentOffsets.Length; i++) + { + writer.WriteLine(objectLogSegmentOffsets[i]); + } + } } + return ms.ToArray(); } } @@ -476,16 +330,19 @@ internal struct HybridLogCheckpointInfo public CountdownEvent flushed; public long started; - public void Initialize(Guid token, int _version) + public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager) { info.Initialize(token, _version); started = 0; + checkpointManager.InitializeLogCheckpoint(token); } - public void Recover(Guid token, DirectoryConfiguration directoryConfiguration) + + public void Recover(Guid token, ICheckpointManager checkpointManager) { - info.Recover(token, directoryConfiguration); + info.Recover(token, checkpointManager); started = 0; } + public void Reset() { started = 0; @@ -539,24 +396,34 @@ public void Initialize(StreamReader reader) value = reader.ReadLine(); finalLogicalAddress = long.Parse(value); } - public void Recover(Guid guid, DirectoryConfiguration directoryConfiguration) + + public void Recover(Guid guid, ICheckpointManager checkpointManager) { - string indexInfoFile = directoryConfiguration.GetIndexCheckpointMetaFileName(guid); - using (var reader = new StreamReader(indexInfoFile)) - { - Initialize(reader); - } + var metadata = checkpointManager.GetIndexCommitMetadata(guid); + if (metadata == null) + throw new Exception("Invalid index commit metadata for ID " + guid.ToString()); + Initialize(new StreamReader(new MemoryStream(metadata))); } - public void Write(StreamWriter writer) + + public byte[] ToByteArray() { - writer.WriteLine(token); - writer.WriteLine(table_size); - writer.WriteLine(num_ht_bytes); - writer.WriteLine(num_ofb_bytes); - writer.WriteLine(num_buckets); - writer.WriteLine(startLogicalAddress); - writer.WriteLine(finalLogicalAddress); + using (var ms = new MemoryStream()) + { + using (var writer = new StreamWriter(ms)) + { + + writer.WriteLine(token); + writer.WriteLine(table_size); + writer.WriteLine(num_ht_bytes); + writer.WriteLine(num_ofb_bytes); + writer.WriteLine(num_buckets); + writer.WriteLine(startLogicalAddress); + writer.WriteLine(finalLogicalAddress); + } + return ms.ToArray(); + } } + public void DebugPrint() { Debug.WriteLine("******** Index Checkpoint Info for {0} ********", token); @@ -583,23 +450,21 @@ internal struct IndexCheckpointInfo { public IndexRecoveryInfo info; public IDevice main_ht_device; - public IDevice ofb_device; - public void Initialize(Guid token, long _size, DirectoryConfiguration directoryConfiguration) + public void Initialize(Guid token, long _size, ICheckpointManager checkpointManager) { info.Initialize(token, _size); - main_ht_device = Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(token), false); - ofb_device = Devices.CreateLogDevice(directoryConfiguration.GetOverflowBucketsFileName(token), false); + checkpointManager.InitializeIndexCheckpoint(token); + main_ht_device = checkpointManager.GetIndexDevice(token); } - public void Recover(Guid token, DirectoryConfiguration directoryConfiguration) + public void Recover(Guid token, ICheckpointManager checkpointManager) { - info.Recover(token, directoryConfiguration); + info.Recover(token, checkpointManager); } public void Reset() { info.Reset(); main_ht_device.Close(); - ofb_device.Close(); } } } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index e6082f613..1d7e58549 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -4,6 +4,7 @@ #pragma warning disable 0162 using System; +using System.Collections.Concurrent; using System.Runtime.CompilerServices; namespace FASTER.core @@ -65,7 +66,7 @@ private enum CheckpointType private HybridLogCheckpointInfo _hybridLogCheckpoint; - private SafeConcurrentDictionary _recoveredSessions; + private ConcurrentDictionary _recoveredSessions; private FastThreadLocal prevThreadCtx; private FastThreadLocal threadCtx; @@ -104,7 +105,10 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo if (checkpointSettings == null) checkpointSettings = new CheckpointSettings(); - directoryConfiguration = new DirectoryConfiguration(checkpointSettings.CheckpointDir); + if (checkpointSettings.CheckpointDir != null && checkpointSettings.CheckpointManager != null) + throw new Exception("Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both"); + + checkpointManager = checkpointSettings.CheckpointManager ?? new LocalCheckpointManager(checkpointSettings.CheckpointDir ?? ""); FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; CopyReadsToTail = logSettings.CopyReadsToTail; diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs similarity index 91% rename from cs/src/core/Index/FASTER/Checkpoint.cs rename to cs/src/core/Index/Recovery/Checkpoint.cs index 4d27c1ae8..f17a8fc10 100644 --- a/cs/src/core/Index/FASTER/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -247,8 +247,6 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta { _indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress(); ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress); - WriteIndexMetaFile(); - WriteIndexCheckpointCompleteFile(); } _hybridLogCheckpoint.info.headAddress = hlog.HeadAddress; @@ -263,10 +261,8 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta { ObtainCurrentTailAddress(ref _hybridLogCheckpoint.info.finalLogicalAddress); - _hybridLogCheckpoint.snapshotFileDevice = Devices.CreateLogDevice - (directoryConfiguration.GetHybridLogCheckpointFileName(_hybridLogCheckpointToken), false); - _hybridLogCheckpoint.snapshotFileObjectLogDevice = Devices.CreateLogDevice - (directoryConfiguration.GetHybridLogObjectCheckpointFileName(_hybridLogCheckpointToken), false); + _hybridLogCheckpoint.snapshotFileDevice = checkpointManager.GetSnapshotLogDevice(_hybridLogCheckpointToken); + _hybridLogCheckpoint.snapshotFileObjectLogDevice = checkpointManager.GetSnapshotObjectLogDevice(_hybridLogCheckpointToken); _hybridLogCheckpoint.snapshotFileDevice.Initialize(hlog.GetSegmentSize()); _hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(hlog.GetSegmentSize()); @@ -294,7 +290,10 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta case Phase.PERSISTENCE_CALLBACK: { WriteHybridLogMetaInfo(); - WriteHybridLogCheckpointCompleteFile(); + + if (_checkpointType == CheckpointType.FULL) + WriteIndexMetaInfo(); + MakeTransition(intermediateState, nextState); break; } @@ -346,8 +345,7 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta { _indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress(); ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress); - WriteIndexMetaFile(); - WriteIndexCheckpointCompleteFile(); + WriteIndexMetaInfo(); _indexCheckpoint.Reset(); break; } @@ -526,7 +524,7 @@ private void HandleCheckpointingPhases() if (notify) { - WriteHybridLogContextInfo(); + _hybridLogCheckpoint.info.checkpointTokens.TryAdd(prevThreadCtx.Value.guid, prevThreadCtx.Value.serialNum); if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version)) { @@ -661,12 +659,12 @@ private SystemState GetNextState(SystemState start, CheckpointType type = Checkp case Phase.INDEX_CHECKPOINT: switch(type) { - case CheckpointType.INDEX_ONLY: - nextState.phase = Phase.REST; - break; case CheckpointType.FULL: nextState.phase = Phase.PREPARE; break; + default: + nextState.phase = Phase.REST; + break; } break; case Phase.PREPARE: @@ -701,56 +699,14 @@ private SystemState GetNextState(SystemState start, CheckpointType type = Checkp private void WriteHybridLogMetaInfo() { - string filename = directoryConfiguration.GetHybridLogCheckpointMetaFileName(_hybridLogCheckpointToken); - using (var file = new StreamWriter(filename, false)) - { - _hybridLogCheckpoint.info.Write(file); - file.Flush(); - } + checkpointManager.CommitLogCheckpoint(_hybridLogCheckpointToken, _hybridLogCheckpoint.info.ToByteArray()); } - private void WriteHybridLogCheckpointCompleteFile() + private void WriteIndexMetaInfo() { - string completed_filename = directoryConfiguration.GetHybridLogCheckpointFolder(_hybridLogCheckpointToken); - completed_filename += Path.DirectorySeparatorChar + "completed.dat"; - using (var file = new StreamWriter(completed_filename, false)) - { - file.WriteLine(); - file.Flush(); - } + checkpointManager.CommitIndexCheckpoint(_indexCheckpointToken, _indexCheckpoint.info.ToByteArray()); } - private void WriteHybridLogContextInfo() - { - string filename = directoryConfiguration.GetHybridLogCheckpointContextFileName(_hybridLogCheckpointToken, prevThreadCtx.Value.guid); - using (var file = new StreamWriter(filename, false)) - { - prevThreadCtx.Value.Write(file); - file.Flush(); - } - - } - - private void WriteIndexMetaFile() - { - string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(_indexCheckpointToken); - using (var file = new StreamWriter(filename, false)) - { - _indexCheckpoint.info.Write(file); - file.Flush(); - } - } - - private void WriteIndexCheckpointCompleteFile() - { - string completed_filename = directoryConfiguration.GetIndexCheckpointFolder(_indexCheckpointToken); - completed_filename += Path.DirectorySeparatorChar + "completed.dat"; - using (var file = new StreamWriter(completed_filename, false)) - { - file.WriteLine(); - file.Flush(); - } - } private bool ObtainCurrentTailAddress(ref long location) { var tailAddress = hlog.GetTailAddress(); @@ -759,14 +715,12 @@ private bool ObtainCurrentTailAddress(ref long location) private void InitializeIndexCheckpoint(Guid indexToken) { - directoryConfiguration.CreateIndexCheckpointFolder(indexToken); - _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, directoryConfiguration); + _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, checkpointManager); } private void InitializeHybridLogCheckpoint(Guid hybridLogToken, int version) { - directoryConfiguration.CreateHybridLogCheckpointFolder(hybridLogToken); - _hybridLogCheckpoint.Initialize(hybridLogToken, version); + _hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager); } #endregion diff --git a/cs/src/core/Index/Recovery/DirectoryConfiguration.cs b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs new file mode 100644 index 000000000..5868b2172 --- /dev/null +++ b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.IO; + +namespace FASTER.core +{ + class DirectoryConfiguration + { + private readonly string checkpointDir; + + public DirectoryConfiguration(string checkpointDir) + { + this.checkpointDir = checkpointDir; + } + + public const string index_base_folder = "index-checkpoints"; + public const string index_meta_file = "info"; + public const string hash_table_file = "ht"; + public const string overflow_buckets_file = "ofb"; + public const string snapshot_file = "snapshot"; + + public const string cpr_base_folder = "cpr-checkpoints"; + public const string cpr_meta_file = "info"; + + public void CreateIndexCheckpointFolder(Guid token) + { + var directory = GetIndexCheckpointFolder(token); + Directory.CreateDirectory(directory); + DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); + foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) + file.Delete(); + } + public void CreateHybridLogCheckpointFolder(Guid token) + { + var directory = GetHybridLogCheckpointFolder(token); + Directory.CreateDirectory(directory); + DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); + foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) + file.Delete(); + } + + public string GetIndexCheckpointFolder(Guid token = default(Guid)) + { + if (token != default(Guid)) + return GetMergedFolderPath(checkpointDir, index_base_folder, token.ToString()); + else + return GetMergedFolderPath(checkpointDir, index_base_folder); + } + + public string GetHybridLogCheckpointFolder(Guid token = default(Guid)) + { + if (token != default(Guid)) + return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString()); + else + return GetMergedFolderPath(checkpointDir, cpr_base_folder); + } + + public string GetIndexCheckpointMetaFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + index_base_folder, + token.ToString(), + index_meta_file, + ".dat"); + } + + public string GetPrimaryHashTableFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + index_base_folder, + token.ToString(), + hash_table_file, + ".dat"); + } + + public string GetOverflowBucketsFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + index_base_folder, + token.ToString(), + overflow_buckets_file, + ".dat"); + } + + public string GetHybridLogCheckpointMetaFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + cpr_base_folder, + token.ToString(), + cpr_meta_file, + ".dat"); + } + + public string GetHybridLogCheckpointContextFileName(Guid checkpointToken, Guid sessionToken) + { + return GetMergedFolderPath(checkpointDir, + cpr_base_folder, + checkpointToken.ToString(), + sessionToken.ToString(), + ".dat"); + } + + public string GetLogSnapshotFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), snapshot_file, ".dat"); + } + + public string GetObjectLogSnapshotFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), snapshot_file, ".obj.dat"); + } + + private static string GetMergedFolderPath(params String[] paths) + { + String fullPath = paths[0]; + + for (int i = 1; i < paths.Length; i++) + { + if (i == paths.Length - 1 && paths[i].Contains(".")) + { + fullPath += paths[i]; + } + else + { + fullPath += Path.DirectorySeparatorChar + paths[i]; + } + } + + return fullPath; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs new file mode 100644 index 000000000..947d06df4 --- /dev/null +++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Interface for users to control creation and retrieval of checkpoint-related data + /// FASTER calls this interface during checkpoint/recovery in this sequence: + /// + /// Checkpoint: + /// InitializeIndexCheckpoint (for index checkpoints) -> + /// GetIndexDevice (for index checkpoints) -> + /// InitializeLogCheckpoint (for log checkpoints) -> + /// GetSnapshotLogDevice (for log checkpoints in snapshot mode) -> + /// GetSnapshotObjectLogDevice (for log checkpoints in snapshot mode with objects) -> + /// CommitLogCheckpoint (for log checkpoints) -> + /// CommitIndexCheckpoint (for index checkpoints) -> + /// + /// Recovery: + /// GetLatestCheckpoint (if request to recover to latest checkpoint) -> + /// GetIndexCommitMetadata -> + /// GetLogCommitMetadata -> + /// GetIndexDevice -> + /// GetSnapshotLogDevice (for recovery in snapshot mode) -> + /// GetSnapshotObjectLogDevice (for recovery in snapshot mode with objects) + /// + /// Provided devices will be closed directly by FASTER when done. + /// + public interface ICheckpointManager + { + /// + /// Initialize index checkpoint + /// + /// + void InitializeIndexCheckpoint(Guid indexToken); + + /// + /// Initialize log checkpoint (snapshot and fold-over) + /// + /// + void InitializeLogCheckpoint(Guid logToken); + + /// + /// Commit index checkpoint + /// + /// + /// + /// + void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata); + + /// + /// Commit log checkpoint (snapshot and fold-over) + /// + /// + /// + /// + void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata); + + /// + /// Retrieve commit metadata for specified index checkpoint + /// + /// Token + /// Metadata, or null if invalid + byte[] GetIndexCommitMetadata(Guid indexToken); + + /// + /// Retrieve commit metadata for specified log checkpoint + /// + /// Token + /// Metadata, or null if invalid + byte[] GetLogCommitMetadata(Guid logToken); + + /// + /// Provide device to store index checkpoint (including overflow buckets) + /// + /// + /// + IDevice GetIndexDevice(Guid indexToken); + + /// + /// Provide device to store snapshot of log (required only for snapshot checkpoints) + /// + /// + /// + IDevice GetSnapshotLogDevice(Guid token); + + /// + /// Provide device to store snapshot of object log (required only for snapshot checkpoints) + /// + /// + /// + IDevice GetSnapshotObjectLogDevice(Guid token); + + /// + /// Get latest valid checkpoint for recovery + /// + /// + /// + /// true if latest valid checkpoint found, false otherwise + bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken); + } +} \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/IndexCheckpoint.cs b/cs/src/core/Index/Recovery/IndexCheckpoint.cs similarity index 89% rename from cs/src/core/Index/FASTER/IndexCheckpoint.cs rename to cs/src/core/Index/Recovery/IndexCheckpoint.cs index 0ec78b3ad..b0d228f5f 100644 --- a/cs/src/core/Index/FASTER/IndexCheckpoint.cs +++ b/cs/src/core/Index/Recovery/IndexCheckpoint.cs @@ -28,9 +28,10 @@ internal void TakeIndexFuzzyCheckpoint() TakeMainIndexCheckpoint(ht_version, _indexCheckpoint.main_ht_device, out ulong ht_num_bytes_written); - overflowBucketsAllocator.TakeCheckpoint( - _indexCheckpoint.ofb_device, - out ulong ofb_num_bytes_written); + + var sectorSize = _indexCheckpoint.main_ht_device.SectorSize; + var alignedIndexSize = (uint)((ht_num_bytes_written + (sectorSize - 1)) & ~(sectorSize - 1)); + overflowBucketsAllocator.TakeCheckpoint(_indexCheckpoint.main_ht_device, alignedIndexSize, out ulong ofb_num_bytes_written); _indexCheckpoint.info.num_ht_bytes = ht_num_bytes_written; _indexCheckpoint.info.num_ofb_bytes = ofb_num_bytes_written; } @@ -40,7 +41,9 @@ internal void TakeIndexFuzzyCheckpoint(int ht_version, IDevice device, out ulong ofbnumBytesWritten, out int num_ofb_buckets) { TakeMainIndexCheckpoint(ht_version, device, out numBytesWritten); - overflowBucketsAllocator.TakeCheckpoint(ofbdevice, out ofbnumBytesWritten); + var sectorSize = device.SectorSize; + var alignedIndexSize = (uint)((numBytesWritten + (sectorSize - 1)) & ~(sectorSize - 1)); + overflowBucketsAllocator.TakeCheckpoint(ofbdevice, alignedIndexSize, out ofbnumBytesWritten); num_ofb_buckets = overflowBucketsAllocator.GetMaxValidAddress(); } diff --git a/cs/src/core/Index/FASTER/IndexRecovery.cs b/cs/src/core/Index/Recovery/IndexRecovery.cs similarity index 87% rename from cs/src/core/Index/FASTER/IndexRecovery.cs rename to cs/src/core/Index/Recovery/IndexRecovery.cs index fca7d230d..d1f0bb918 100644 --- a/cs/src/core/Index/FASTER/IndexRecovery.cs +++ b/cs/src/core/Index/Recovery/IndexRecovery.cs @@ -19,7 +19,7 @@ namespace FASTER.core /// public unsafe partial class FasterBase { - internal DirectoryConfiguration directoryConfiguration; + internal ICheckpointManager checkpointManager; // Derived class exposed API internal void RecoverFuzzyIndex(IndexCheckpointInfo info) @@ -29,24 +29,22 @@ internal void RecoverFuzzyIndex(IndexCheckpointInfo info) Debug.Assert(state[ht_version].size == info.info.table_size); // Create devices to read from using Async API - info.main_ht_device = Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(token), false); - info.ofb_device = Devices.CreateLogDevice(directoryConfiguration.GetOverflowBucketsFileName(token), false); + info.main_ht_device = checkpointManager.GetIndexDevice(token); BeginMainIndexRecovery(ht_version, info.main_ht_device, info.info.num_ht_bytes); - overflowBucketsAllocator.Recover( - info.ofb_device, - info.info.num_buckets, - info.info.num_ofb_bytes); + var sectorSize = info.main_ht_device.SectorSize; + var alignedIndexSize = (uint)((info.info.num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1)); + + overflowBucketsAllocator.Recover(info.main_ht_device, alignedIndexSize, info.info.num_buckets, info.info.num_ofb_bytes); // Wait until reading is complete IsFuzzyIndexRecoveryComplete(true); // close index checkpoint files appropriately info.main_ht_device.Close(); - info.ofb_device.Close(); // Delete all tentative entries! DeleteTentativeEntries(); @@ -55,7 +53,9 @@ internal void RecoverFuzzyIndex(IndexCheckpointInfo info) internal void RecoverFuzzyIndex(int ht_version, IDevice device, ulong num_ht_bytes, IDevice ofbdevice, int num_buckets, ulong num_ofb_bytes) { BeginMainIndexRecovery(ht_version, device, num_ht_bytes); - overflowBucketsAllocator.Recover(ofbdevice, num_buckets, num_ofb_bytes); + var sectorSize = device.SectorSize; + var alignedIndexSize = (uint)((num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1)); + overflowBucketsAllocator.Recover(ofbdevice, alignedIndexSize, num_buckets, num_ofb_bytes); } internal bool IsFuzzyIndexRecoveryComplete(bool waitUntilComplete = false) diff --git a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs new file mode 100644 index 000000000..9eb270d95 --- /dev/null +++ b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs @@ -0,0 +1,206 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Implementation of checkpoint interface for local file storage + /// + public class LocalCheckpointManager : ICheckpointManager + { + private DirectoryConfiguration directoryConfiguration; + + /// + /// Create new instance of local checkpoint manager at given base directory + /// + /// + public LocalCheckpointManager(string CheckpointDir) + { + directoryConfiguration = new DirectoryConfiguration(CheckpointDir); + } + + /// + /// Initialize index checkpoint + /// + /// + public void InitializeIndexCheckpoint(Guid indexToken) + { + directoryConfiguration.CreateIndexCheckpointFolder(indexToken); + } + + /// + /// Initialize log checkpoint (snapshot and fold-over) + /// + /// + public void InitializeLogCheckpoint(Guid logToken) + { + directoryConfiguration.CreateHybridLogCheckpointFolder(logToken); + } + + /// + /// Commit index checkpoint + /// + /// + /// + public void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata) + { + string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(indexToken); + using (var writer = new BinaryWriter(new FileStream(filename, FileMode.Create))) + { + writer.Write(commitMetadata.Length); + writer.Write(commitMetadata); + writer.Flush(); + } + + string completed_filename = directoryConfiguration.GetIndexCheckpointFolder(indexToken); + completed_filename += Path.DirectorySeparatorChar + "completed.dat"; + using (var file = new FileStream(completed_filename, FileMode.Create)) + { + file.Flush(); + } + } + + /// + /// Commit log checkpoint (snapshot and fold-over) + /// + /// + /// + public void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) + { + string filename = directoryConfiguration.GetHybridLogCheckpointMetaFileName(logToken); + using (var writer = new BinaryWriter(new FileStream(filename, FileMode.Create))) + { + writer.Write(commitMetadata.Length); + writer.Write(commitMetadata); + writer.Flush(); + } + + string completed_filename = directoryConfiguration.GetHybridLogCheckpointFolder(logToken); + completed_filename += Path.DirectorySeparatorChar + "completed.dat"; + using (var file = new FileStream(completed_filename, FileMode.Create)) + { + file.Flush(); + } + } + + /// + /// Retrieve commit metadata for specified index checkpoint + /// + /// Token + /// Metadata, or null if invalid + public byte[] GetIndexCommitMetadata(Guid indexToken) + { + var dir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder(indexToken)); + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + return null; + + string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(indexToken); + using (var reader = new BinaryReader(new FileStream(filename, FileMode.Open))) + { + var len = reader.ReadInt32(); + return reader.ReadBytes(len); + } + } + + /// + /// Retrieve commit metadata for specified log checkpoint + /// + /// Token + /// Metadata, or null if invalid + public byte[] GetLogCommitMetadata(Guid logToken) + { + var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(logToken)); + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + return null; + + string checkpointInfoFile = directoryConfiguration.GetHybridLogCheckpointMetaFileName(logToken); + using (var reader = new BinaryReader(new FileStream(checkpointInfoFile, FileMode.Open))) + { + var len = reader.ReadInt32(); + return reader.ReadBytes(len); + } + } + + /// + /// Provide device to store index checkpoint (including overflow buckets) + /// + /// + /// + public IDevice GetIndexDevice(Guid indexToken) + { + return Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(indexToken), false); + } + + /// + /// Provide device to store snapshot of log (required only for snapshot checkpoints) + /// + /// + /// + public IDevice GetSnapshotLogDevice(Guid token) + { + return Devices.CreateLogDevice(directoryConfiguration.GetLogSnapshotFileName(token), false); + } + + /// + /// Provide device to store snapshot of object log (required only for snapshot checkpoints) + /// + /// + /// + public IDevice GetSnapshotObjectLogDevice(Guid token) + { + return Devices.CreateLogDevice(directoryConfiguration.GetObjectLogSnapshotFileName(token), false); + } + + /// + /// Get latest valid checkpoint for recovery + /// + /// + /// + /// + public bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken) + { + var indexCheckpointDir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder()); + var dirs = indexCheckpointDir.GetDirectories(); + foreach (var dir in dirs) + { + // Remove incomplete checkpoints + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + { + Directory.Delete(dir.FullName, true); + } + } + var latestICFolder = indexCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First(); + if (latestICFolder == null || !Guid.TryParse(latestICFolder.Name, out indexToken)) + { + throw new Exception("No valid index checkpoint to recover from"); + } + + + var hlogCheckpointDir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder()); + dirs = hlogCheckpointDir.GetDirectories(); + foreach (var dir in dirs) + { + // Remove incomplete checkpoints + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + { + Directory.Delete(dir.FullName, true); + } + } + var latestHLCFolder = hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First(); + if (latestHLCFolder == null || !Guid.TryParse(latestHLCFolder.Name, out logToken)) + { + throw new Exception("No valid hybrid log checkpoint to recover from"); + } + return true; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs similarity index 83% rename from cs/src/core/Index/FASTER/Recovery.cs rename to cs/src/core/Index/Recovery/Recovery.cs index 72cf67e53..81bf73da4 100644 --- a/cs/src/core/Index/FASTER/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -62,66 +62,10 @@ public unsafe partial class FasterKV f.LastWriteTime).First(); - if(latestICFolder == null || !Guid.TryParse(latestICFolder.Name, out Guid indexCheckpointGuid)) - { - throw new Exception("No valid index checkpoint to recover from"); - } - - - var hlogCheckpointDir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder()); - dirs = hlogCheckpointDir.GetDirectories(); - foreach (var dir in dirs) - { - // Remove incomplete checkpoints - if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) - { - Directory.Delete(dir.FullName, true); - } - } - var latestHLCFolder = hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First(); - if (latestHLCFolder == null || !Guid.TryParse(latestHLCFolder.Name, out Guid hybridLogCheckpointGuid)) - { - throw new Exception("No valid hybrid log checkpoint to recover from"); - } - + checkpointManager.GetLatestCheckpoint(out Guid indexCheckpointGuid, out Guid hybridLogCheckpointGuid); InternalRecover(indexCheckpointGuid, hybridLogCheckpointGuid); } - private bool IsCheckpointSafe(Guid token, CheckpointType checkpointType) - { - switch (checkpointType) - { - case CheckpointType.INDEX_ONLY: - { - var dir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder(token)); - return File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"); - } - case CheckpointType.HYBRID_LOG_ONLY: - { - var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(token)); - return File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"); - } - case CheckpointType.FULL: - { - return IsCheckpointSafe(token, CheckpointType.INDEX_ONLY) - && IsCheckpointSafe(token, CheckpointType.HYBRID_LOG_ONLY); - } - default: - return false; - } - } - private bool IsCompatible(IndexRecoveryInfo indexInfo, HybridLogRecoveryInfo recoveryInfo) { var l1 = indexInfo.finalLogicalAddress; @@ -135,20 +79,13 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) Debug.WriteLine("Index Checkpoint: {0}", indexToken); Debug.WriteLine("HybridLog Checkpoint: {0}", hybridLogToken); - // Assert corresponding checkpoints are safe to recover from - Debug.Assert(IsCheckpointSafe(indexToken, CheckpointType.INDEX_ONLY), - "Cannot recover from incomplete index checkpoint " + indexToken.ToString()); - - Debug.Assert(IsCheckpointSafe(hybridLogToken, CheckpointType.HYBRID_LOG_ONLY), - "Cannot recover from incomplete hybrid log checkpoint " + hybridLogToken.ToString()); - // Recovery appropriate context information var recoveredICInfo = new IndexCheckpointInfo(); - recoveredICInfo.Recover(indexToken, directoryConfiguration); + recoveredICInfo.Recover(indexToken, checkpointManager); recoveredICInfo.info.DebugPrint(); var recoveredHLCInfo = new HybridLogCheckpointInfo(); - recoveredHLCInfo.Recover(hybridLogToken, directoryConfiguration); + recoveredHLCInfo.Recover(hybridLogToken, checkpointManager); recoveredHLCInfo.info.DebugPrint(); // Check if the two checkpoints are compatible for recovery @@ -187,12 +124,7 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress); // Recover session information - _recoveredSessions = new SafeConcurrentDictionary(); - foreach(var sessionInfo in recoveredHLCInfo.info.continueTokens) - { - - _recoveredSessions.GetOrAdd(sessionInfo.Key, sessionInfo.Value); - } + _recoveredSessions = recoveredHLCInfo.info.continueTokens; } private void RestoreHybridLog(long untilAddress, long headAddress) @@ -338,8 +270,8 @@ private void RecoverHybridLogFromSnapshotFile( // By default first page has one extra record var capacity = hlog.GetCapacityNumPages(); - var recoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogCheckpointFileName(recoveryInfo.guid), false); - var objectLogRecoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogObjectCheckpointFileName(recoveryInfo.guid), false); + var recoveryDevice = checkpointManager.GetSnapshotLogDevice(recoveryInfo.guid); + var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(recoveryInfo.guid); recoveryDevice.Initialize(hlog.GetSegmentSize()); objectLogRecoveryDevice.Initialize(hlog.GetSegmentSize()); var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress) diff --git a/cs/test/FullRecoveryTests.cs b/cs/test/FullRecoveryTests.cs index 12d91aded..b07f129be 100644 --- a/cs/test/FullRecoveryTests.cs +++ b/cs/test/FullRecoveryTests.cs @@ -170,7 +170,7 @@ public void RecoverAndTest(Guid cprVersion, Guid indexVersion) // Test outputs var checkpointInfo = default(HybridLogRecoveryInfo); - checkpointInfo.Recover(cprVersion, new DirectoryConfiguration(test_path)); + checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path)); // Compute expected array long[] expected = new long[numUniqueKeys]; diff --git a/cs/test/ObjectRecoveryTest.cs b/cs/test/ObjectRecoveryTest.cs index 670afbce7..10287f21d 100644 --- a/cs/test/ObjectRecoveryTest.cs +++ b/cs/test/ObjectRecoveryTest.cs @@ -189,7 +189,7 @@ public unsafe void RecoverAndTest(Guid cprVersion, Guid indexVersion) // Test outputs var checkpointInfo = default(HybridLogRecoveryInfo); - checkpointInfo.Recover(cprVersion, new DirectoryConfiguration(test_path)); + checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path)); // Compute expected array long[] expected = new long[numUniqueKeys]; diff --git a/cs/test/SharedDirectoryTests.cs b/cs/test/SharedDirectoryTests.cs index 849956bce..7c4e5c9ee 100644 --- a/cs/test/SharedDirectoryTests.cs +++ b/cs/test/SharedDirectoryTests.cs @@ -153,7 +153,7 @@ private void Populate(FasterKV private void Test(FasterTestInstance fasterInstance, Guid checkpointToken) { var checkpointInfo = default(HybridLogRecoveryInfo); - Assert.IsTrue(checkpointInfo.Recover(checkpointToken, new DirectoryConfiguration(fasterInstance.CheckpointDirectory))); + checkpointInfo.Recover(checkpointToken, new LocalCheckpointManager(fasterInstance.CheckpointDirectory)); // Create array for reading var inputArray = new Input[numUniqueKeys];