diff --git a/cs/src/core/Allocator/MallocFixedPageSize.cs b/cs/src/core/Allocator/MallocFixedPageSize.cs index d08beb2e5..846551c4f 100644 --- a/cs/src/core/Allocator/MallocFixedPageSize.cs +++ b/cs/src/core/Allocator/MallocFixedPageSize.cs @@ -470,10 +470,11 @@ public void Dispose() /// Public facing persistence API /// /// + /// /// - public void TakeCheckpoint(IDevice device, out ulong numBytes) + public void TakeCheckpoint(IDevice device, ulong start_offset, out ulong numBytes) { - BeginCheckpoint(device, 0UL, out numBytes); + BeginCheckpoint(device, start_offset, out numBytes); } /// @@ -563,9 +564,10 @@ public int GetPageSize() /// /// /// - public void Recover(IDevice device, int buckets, ulong numBytes) + /// + public void Recover(IDevice device, ulong offset, int buckets, ulong numBytes) { - BeginRecovery(device, 0UL, buckets, numBytes, out ulong numBytesRead); + BeginRecovery(device, offset, buckets, numBytes, out ulong numBytesRead); } /// diff --git a/cs/src/core/Index/Common/CheckpointSettings.cs b/cs/src/core/Index/Common/CheckpointSettings.cs index ba76f5016..6e20f6dbb 100644 --- a/cs/src/core/Index/Common/CheckpointSettings.cs +++ b/cs/src/core/Index/Common/CheckpointSettings.cs @@ -2,6 +2,8 @@ // Licensed under the MIT license. +using System; + namespace FASTER.core { /// @@ -27,13 +29,20 @@ public enum CheckpointType public class CheckpointSettings { /// - /// Directory where checkpoints are stored + /// Checkpoint manager /// - public string CheckpointDir = ""; + public ICheckpointManager CheckpointManager = null; /// /// Type of checkpoint /// public CheckpointType CheckPointType = CheckpointType.Snapshot; + + /// + /// Use specified directory for storing and retrieving checkpoints + /// This is a shortcut to providing the following: + /// CheckpointSettings.CheckpointManager = new LocalCheckpointManager(CheckpointDir) + /// + public string CheckpointDir = null; } } diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 16c289b0b..91910ea9f 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -106,139 +106,7 @@ internal class FasterExecutionContext : SerializedFasterExecutionContext } } - internal class DirectoryConfiguration - { - private readonly string checkpointDir; - public DirectoryConfiguration(string checkpointDir) - { - this.checkpointDir = checkpointDir; - } - - public const string index_base_folder = "index-checkpoints"; - public const string index_meta_file = "info"; - public const string hash_table_file = "ht"; - public const string overflow_buckets_file = "ofb"; - public const string snapshot_file = "snapshot"; - - public const string cpr_base_folder = "cpr-checkpoints"; - public const string cpr_meta_file = "info"; - - public void CreateIndexCheckpointFolder(Guid token) - { - var directory = GetIndexCheckpointFolder(token); - Directory.CreateDirectory(directory); - DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); - foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) - file.Delete(); - } - public void CreateHybridLogCheckpointFolder(Guid token) - { - var directory = GetHybridLogCheckpointFolder(token); - Directory.CreateDirectory(directory); - DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); - foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) - file.Delete(); - } - - public string GetIndexCheckpointFolder(Guid token = default(Guid)) - { - if (token != default(Guid)) - return GetMergedFolderPath(checkpointDir, index_base_folder, token.ToString()); - else - return GetMergedFolderPath(checkpointDir, index_base_folder); - } - - public string GetHybridLogCheckpointFolder(Guid token = default(Guid)) - { - if (token != default(Guid)) - return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString()); - else - return GetMergedFolderPath(checkpointDir, cpr_base_folder); - } - - public string GetIndexCheckpointMetaFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - index_base_folder, - token.ToString(), - index_meta_file, - ".dat"); - } - - public string GetPrimaryHashTableFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - index_base_folder, - token.ToString(), - hash_table_file, - ".dat"); - } - - public string GetOverflowBucketsFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - index_base_folder, - token.ToString(), - overflow_buckets_file, - ".dat"); - } - - public string GetHybridLogCheckpointMetaFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - token.ToString(), - cpr_meta_file, - ".dat"); - } - - public string GetHybridLogCheckpointContextFileName(Guid checkpointToken, Guid sessionToken) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - checkpointToken.ToString(), - sessionToken.ToString(), - ".dat"); - } - - public string GetHybridLogCheckpointFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - token.ToString(), - snapshot_file, - ".dat"); - } - - public string GetHybridLogObjectCheckpointFileName(Guid token) - { - return GetMergedFolderPath(checkpointDir, - cpr_base_folder, - token.ToString(), - snapshot_file, - ".obj.dat"); - } - - public static string GetMergedFolderPath(params String[] paths) - { - String fullPath = paths[0]; - - for (int i = 1; i < paths.Length; i++) - { - if (i == paths.Length - 1 && paths[i].Contains(".")) - { - fullPath += paths[i]; - } - else - { - fullPath += Path.DirectorySeparatorChar + paths[i]; - } - } - - return fullPath; - } - } - + /// /// Recovery info for hybrid log /// @@ -280,10 +148,17 @@ public struct HybridLogRecoveryInfo /// Guid array /// public Guid[] guids; + + /// + /// Tokens per guid restored during Continue + /// + public ConcurrentDictionary continueTokens; + /// - /// Tokens per guid + /// Tokens per guid created during Checkpoint /// - public Dictionary continueTokens; + public ConcurrentDictionary checkpointTokens; + /// /// Object log segment offsets /// @@ -305,7 +180,8 @@ public void Initialize(Guid token, int _version) finalLogicalAddress = 0; headAddress = 0; guids = new Guid[LightEpoch.kTableSize + 1]; - continueTokens = new Dictionary(); + continueTokens = new ConcurrentDictionary(); + checkpointTokens = new ConcurrentDictionary(); objectLogSegmentOffsets = null; } @@ -316,7 +192,7 @@ public void Initialize(Guid token, int _version) public void Initialize(StreamReader reader) { guids = new Guid[LightEpoch.kTableSize + 1]; - continueTokens = new Dictionary(); + continueTokens = new ConcurrentDictionary(); string value = reader.ReadLine(); guid = Guid.Parse(value); @@ -346,6 +222,9 @@ public void Initialize(StreamReader reader) { value = reader.ReadLine(); guids[i] = Guid.Parse(value); + value = reader.ReadLine(); + var serialno = long.Parse(value); + continueTokens.TryAdd(guids[i], serialno); } // Read object log segment offsets @@ -362,51 +241,19 @@ public void Initialize(StreamReader reader) } } - /// - /// Recover info from token and checkpoint directory - /// - /// - /// - /// - public bool Recover(Guid token, string checkpointDir) - { - return Recover(token, new DirectoryConfiguration(checkpointDir)); - } - /// /// Recover info from token /// /// - /// + /// /// - internal bool Recover(Guid token, DirectoryConfiguration directoryConfiguration) + internal void Recover(Guid token, ICheckpointManager checkpointManager) { - string checkpointInfoFile = directoryConfiguration.GetHybridLogCheckpointMetaFileName(token); - using (var reader = new StreamReader(checkpointInfoFile)) - { - Initialize(reader); - } - - int num_threads = numThreads; - for (int i = 0; i < num_threads; i++) - { - var guid = guids[i]; - using (var reader = new StreamReader(directoryConfiguration.GetHybridLogCheckpointContextFileName(token, guid))) - { - var ctx = new SerializedFasterExecutionContext(); - ctx.Load(reader); - continueTokens.Add(ctx.guid, ctx.serialNum); - } - } + var metadata = checkpointManager.GetLogCommitMetadata(token); + if (metadata == null) + throw new Exception("Invalid log commit metadata for ID " + token.ToString()); - if (continueTokens.Count == num_threads) - { - return true; - } - else - { - return false; - } + Initialize(new StreamReader(new MemoryStream(metadata))); } /// @@ -418,32 +265,39 @@ public void Reset() } /// - /// Write info to file + /// Write info to byte array /// - /// - public void Write(StreamWriter writer) + public byte[] ToByteArray() { - writer.WriteLine(guid); - writer.WriteLine(useSnapshotFile); - writer.WriteLine(version); - writer.WriteLine(flushedLogicalAddress); - writer.WriteLine(startLogicalAddress); - writer.WriteLine(finalLogicalAddress); - writer.WriteLine(headAddress); - writer.WriteLine(numThreads); - for (int i = 0; i < numThreads; i++) + using (var ms = new MemoryStream()) { - writer.WriteLine(guids[i]); - } - - //Write object log segment offsets - writer.WriteLine(objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length); - if (objectLogSegmentOffsets != null) - { - for (int i = 0; i < objectLogSegmentOffsets.Length; i++) + using (StreamWriter writer = new StreamWriter(ms)) { - writer.WriteLine(objectLogSegmentOffsets[i]); + writer.WriteLine(guid); + writer.WriteLine(useSnapshotFile); + writer.WriteLine(version); + writer.WriteLine(flushedLogicalAddress); + writer.WriteLine(startLogicalAddress); + writer.WriteLine(finalLogicalAddress); + writer.WriteLine(headAddress); + writer.WriteLine(numThreads); + for (int i = 0; i < numThreads; i++) + { + writer.WriteLine(guids[i]); + writer.WriteLine(checkpointTokens[guids[i]]); + } + + // Write object log segment offsets + writer.WriteLine(objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length); + if (objectLogSegmentOffsets != null) + { + for (int i = 0; i < objectLogSegmentOffsets.Length; i++) + { + writer.WriteLine(objectLogSegmentOffsets[i]); + } + } } + return ms.ToArray(); } } @@ -476,16 +330,19 @@ internal struct HybridLogCheckpointInfo public CountdownEvent flushed; public long started; - public void Initialize(Guid token, int _version) + public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager) { info.Initialize(token, _version); started = 0; + checkpointManager.InitializeLogCheckpoint(token); } - public void Recover(Guid token, DirectoryConfiguration directoryConfiguration) + + public void Recover(Guid token, ICheckpointManager checkpointManager) { - info.Recover(token, directoryConfiguration); + info.Recover(token, checkpointManager); started = 0; } + public void Reset() { started = 0; @@ -539,24 +396,34 @@ public void Initialize(StreamReader reader) value = reader.ReadLine(); finalLogicalAddress = long.Parse(value); } - public void Recover(Guid guid, DirectoryConfiguration directoryConfiguration) + + public void Recover(Guid guid, ICheckpointManager checkpointManager) { - string indexInfoFile = directoryConfiguration.GetIndexCheckpointMetaFileName(guid); - using (var reader = new StreamReader(indexInfoFile)) - { - Initialize(reader); - } + var metadata = checkpointManager.GetIndexCommitMetadata(guid); + if (metadata == null) + throw new Exception("Invalid index commit metadata for ID " + guid.ToString()); + Initialize(new StreamReader(new MemoryStream(metadata))); } - public void Write(StreamWriter writer) + + public byte[] ToByteArray() { - writer.WriteLine(token); - writer.WriteLine(table_size); - writer.WriteLine(num_ht_bytes); - writer.WriteLine(num_ofb_bytes); - writer.WriteLine(num_buckets); - writer.WriteLine(startLogicalAddress); - writer.WriteLine(finalLogicalAddress); + using (var ms = new MemoryStream()) + { + using (var writer = new StreamWriter(ms)) + { + + writer.WriteLine(token); + writer.WriteLine(table_size); + writer.WriteLine(num_ht_bytes); + writer.WriteLine(num_ofb_bytes); + writer.WriteLine(num_buckets); + writer.WriteLine(startLogicalAddress); + writer.WriteLine(finalLogicalAddress); + } + return ms.ToArray(); + } } + public void DebugPrint() { Debug.WriteLine("******** Index Checkpoint Info for {0} ********", token); @@ -583,23 +450,21 @@ internal struct IndexCheckpointInfo { public IndexRecoveryInfo info; public IDevice main_ht_device; - public IDevice ofb_device; - public void Initialize(Guid token, long _size, DirectoryConfiguration directoryConfiguration) + public void Initialize(Guid token, long _size, ICheckpointManager checkpointManager) { info.Initialize(token, _size); - main_ht_device = Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(token), false); - ofb_device = Devices.CreateLogDevice(directoryConfiguration.GetOverflowBucketsFileName(token), false); + checkpointManager.InitializeIndexCheckpoint(token); + main_ht_device = checkpointManager.GetIndexDevice(token); } - public void Recover(Guid token, DirectoryConfiguration directoryConfiguration) + public void Recover(Guid token, ICheckpointManager checkpointManager) { - info.Recover(token, directoryConfiguration); + info.Recover(token, checkpointManager); } public void Reset() { info.Reset(); main_ht_device.Close(); - ofb_device.Close(); } } } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index e6082f613..1d7e58549 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -4,6 +4,7 @@ #pragma warning disable 0162 using System; +using System.Collections.Concurrent; using System.Runtime.CompilerServices; namespace FASTER.core @@ -65,7 +66,7 @@ private enum CheckpointType private HybridLogCheckpointInfo _hybridLogCheckpoint; - private SafeConcurrentDictionary _recoveredSessions; + private ConcurrentDictionary _recoveredSessions; private FastThreadLocal prevThreadCtx; private FastThreadLocal threadCtx; @@ -104,7 +105,10 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo if (checkpointSettings == null) checkpointSettings = new CheckpointSettings(); - directoryConfiguration = new DirectoryConfiguration(checkpointSettings.CheckpointDir); + if (checkpointSettings.CheckpointDir != null && checkpointSettings.CheckpointManager != null) + throw new Exception("Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both"); + + checkpointManager = checkpointSettings.CheckpointManager ?? new LocalCheckpointManager(checkpointSettings.CheckpointDir ?? ""); FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; CopyReadsToTail = logSettings.CopyReadsToTail; diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs similarity index 91% rename from cs/src/core/Index/FASTER/Checkpoint.cs rename to cs/src/core/Index/Recovery/Checkpoint.cs index 4d27c1ae8..f17a8fc10 100644 --- a/cs/src/core/Index/FASTER/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -247,8 +247,6 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta { _indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress(); ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress); - WriteIndexMetaFile(); - WriteIndexCheckpointCompleteFile(); } _hybridLogCheckpoint.info.headAddress = hlog.HeadAddress; @@ -263,10 +261,8 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta { ObtainCurrentTailAddress(ref _hybridLogCheckpoint.info.finalLogicalAddress); - _hybridLogCheckpoint.snapshotFileDevice = Devices.CreateLogDevice - (directoryConfiguration.GetHybridLogCheckpointFileName(_hybridLogCheckpointToken), false); - _hybridLogCheckpoint.snapshotFileObjectLogDevice = Devices.CreateLogDevice - (directoryConfiguration.GetHybridLogObjectCheckpointFileName(_hybridLogCheckpointToken), false); + _hybridLogCheckpoint.snapshotFileDevice = checkpointManager.GetSnapshotLogDevice(_hybridLogCheckpointToken); + _hybridLogCheckpoint.snapshotFileObjectLogDevice = checkpointManager.GetSnapshotObjectLogDevice(_hybridLogCheckpointToken); _hybridLogCheckpoint.snapshotFileDevice.Initialize(hlog.GetSegmentSize()); _hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(hlog.GetSegmentSize()); @@ -294,7 +290,10 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta case Phase.PERSISTENCE_CALLBACK: { WriteHybridLogMetaInfo(); - WriteHybridLogCheckpointCompleteFile(); + + if (_checkpointType == CheckpointType.FULL) + WriteIndexMetaInfo(); + MakeTransition(intermediateState, nextState); break; } @@ -346,8 +345,7 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta { _indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress(); ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress); - WriteIndexMetaFile(); - WriteIndexCheckpointCompleteFile(); + WriteIndexMetaInfo(); _indexCheckpoint.Reset(); break; } @@ -526,7 +524,7 @@ private void HandleCheckpointingPhases() if (notify) { - WriteHybridLogContextInfo(); + _hybridLogCheckpoint.info.checkpointTokens.TryAdd(prevThreadCtx.Value.guid, prevThreadCtx.Value.serialNum); if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version)) { @@ -661,12 +659,12 @@ private SystemState GetNextState(SystemState start, CheckpointType type = Checkp case Phase.INDEX_CHECKPOINT: switch(type) { - case CheckpointType.INDEX_ONLY: - nextState.phase = Phase.REST; - break; case CheckpointType.FULL: nextState.phase = Phase.PREPARE; break; + default: + nextState.phase = Phase.REST; + break; } break; case Phase.PREPARE: @@ -701,56 +699,14 @@ private SystemState GetNextState(SystemState start, CheckpointType type = Checkp private void WriteHybridLogMetaInfo() { - string filename = directoryConfiguration.GetHybridLogCheckpointMetaFileName(_hybridLogCheckpointToken); - using (var file = new StreamWriter(filename, false)) - { - _hybridLogCheckpoint.info.Write(file); - file.Flush(); - } + checkpointManager.CommitLogCheckpoint(_hybridLogCheckpointToken, _hybridLogCheckpoint.info.ToByteArray()); } - private void WriteHybridLogCheckpointCompleteFile() + private void WriteIndexMetaInfo() { - string completed_filename = directoryConfiguration.GetHybridLogCheckpointFolder(_hybridLogCheckpointToken); - completed_filename += Path.DirectorySeparatorChar + "completed.dat"; - using (var file = new StreamWriter(completed_filename, false)) - { - file.WriteLine(); - file.Flush(); - } + checkpointManager.CommitIndexCheckpoint(_indexCheckpointToken, _indexCheckpoint.info.ToByteArray()); } - private void WriteHybridLogContextInfo() - { - string filename = directoryConfiguration.GetHybridLogCheckpointContextFileName(_hybridLogCheckpointToken, prevThreadCtx.Value.guid); - using (var file = new StreamWriter(filename, false)) - { - prevThreadCtx.Value.Write(file); - file.Flush(); - } - - } - - private void WriteIndexMetaFile() - { - string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(_indexCheckpointToken); - using (var file = new StreamWriter(filename, false)) - { - _indexCheckpoint.info.Write(file); - file.Flush(); - } - } - - private void WriteIndexCheckpointCompleteFile() - { - string completed_filename = directoryConfiguration.GetIndexCheckpointFolder(_indexCheckpointToken); - completed_filename += Path.DirectorySeparatorChar + "completed.dat"; - using (var file = new StreamWriter(completed_filename, false)) - { - file.WriteLine(); - file.Flush(); - } - } private bool ObtainCurrentTailAddress(ref long location) { var tailAddress = hlog.GetTailAddress(); @@ -759,14 +715,12 @@ private bool ObtainCurrentTailAddress(ref long location) private void InitializeIndexCheckpoint(Guid indexToken) { - directoryConfiguration.CreateIndexCheckpointFolder(indexToken); - _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, directoryConfiguration); + _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, checkpointManager); } private void InitializeHybridLogCheckpoint(Guid hybridLogToken, int version) { - directoryConfiguration.CreateHybridLogCheckpointFolder(hybridLogToken); - _hybridLogCheckpoint.Initialize(hybridLogToken, version); + _hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager); } #endregion diff --git a/cs/src/core/Index/Recovery/DirectoryConfiguration.cs b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs new file mode 100644 index 000000000..5868b2172 --- /dev/null +++ b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.IO; + +namespace FASTER.core +{ + class DirectoryConfiguration + { + private readonly string checkpointDir; + + public DirectoryConfiguration(string checkpointDir) + { + this.checkpointDir = checkpointDir; + } + + public const string index_base_folder = "index-checkpoints"; + public const string index_meta_file = "info"; + public const string hash_table_file = "ht"; + public const string overflow_buckets_file = "ofb"; + public const string snapshot_file = "snapshot"; + + public const string cpr_base_folder = "cpr-checkpoints"; + public const string cpr_meta_file = "info"; + + public void CreateIndexCheckpointFolder(Guid token) + { + var directory = GetIndexCheckpointFolder(token); + Directory.CreateDirectory(directory); + DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); + foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) + file.Delete(); + } + public void CreateHybridLogCheckpointFolder(Guid token) + { + var directory = GetHybridLogCheckpointFolder(token); + Directory.CreateDirectory(directory); + DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory); + foreach (System.IO.FileInfo file in directoryInfo.GetFiles()) + file.Delete(); + } + + public string GetIndexCheckpointFolder(Guid token = default(Guid)) + { + if (token != default(Guid)) + return GetMergedFolderPath(checkpointDir, index_base_folder, token.ToString()); + else + return GetMergedFolderPath(checkpointDir, index_base_folder); + } + + public string GetHybridLogCheckpointFolder(Guid token = default(Guid)) + { + if (token != default(Guid)) + return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString()); + else + return GetMergedFolderPath(checkpointDir, cpr_base_folder); + } + + public string GetIndexCheckpointMetaFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + index_base_folder, + token.ToString(), + index_meta_file, + ".dat"); + } + + public string GetPrimaryHashTableFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + index_base_folder, + token.ToString(), + hash_table_file, + ".dat"); + } + + public string GetOverflowBucketsFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + index_base_folder, + token.ToString(), + overflow_buckets_file, + ".dat"); + } + + public string GetHybridLogCheckpointMetaFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, + cpr_base_folder, + token.ToString(), + cpr_meta_file, + ".dat"); + } + + public string GetHybridLogCheckpointContextFileName(Guid checkpointToken, Guid sessionToken) + { + return GetMergedFolderPath(checkpointDir, + cpr_base_folder, + checkpointToken.ToString(), + sessionToken.ToString(), + ".dat"); + } + + public string GetLogSnapshotFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), snapshot_file, ".dat"); + } + + public string GetObjectLogSnapshotFileName(Guid token) + { + return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), snapshot_file, ".obj.dat"); + } + + private static string GetMergedFolderPath(params String[] paths) + { + String fullPath = paths[0]; + + for (int i = 1; i < paths.Length; i++) + { + if (i == paths.Length - 1 && paths[i].Contains(".")) + { + fullPath += paths[i]; + } + else + { + fullPath += Path.DirectorySeparatorChar + paths[i]; + } + } + + return fullPath; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs new file mode 100644 index 000000000..947d06df4 --- /dev/null +++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Interface for users to control creation and retrieval of checkpoint-related data + /// FASTER calls this interface during checkpoint/recovery in this sequence: + /// + /// Checkpoint: + /// InitializeIndexCheckpoint (for index checkpoints) -> + /// GetIndexDevice (for index checkpoints) -> + /// InitializeLogCheckpoint (for log checkpoints) -> + /// GetSnapshotLogDevice (for log checkpoints in snapshot mode) -> + /// GetSnapshotObjectLogDevice (for log checkpoints in snapshot mode with objects) -> + /// CommitLogCheckpoint (for log checkpoints) -> + /// CommitIndexCheckpoint (for index checkpoints) -> + /// + /// Recovery: + /// GetLatestCheckpoint (if request to recover to latest checkpoint) -> + /// GetIndexCommitMetadata -> + /// GetLogCommitMetadata -> + /// GetIndexDevice -> + /// GetSnapshotLogDevice (for recovery in snapshot mode) -> + /// GetSnapshotObjectLogDevice (for recovery in snapshot mode with objects) + /// + /// Provided devices will be closed directly by FASTER when done. + /// + public interface ICheckpointManager + { + /// + /// Initialize index checkpoint + /// + /// + void InitializeIndexCheckpoint(Guid indexToken); + + /// + /// Initialize log checkpoint (snapshot and fold-over) + /// + /// + void InitializeLogCheckpoint(Guid logToken); + + /// + /// Commit index checkpoint + /// + /// + /// + /// + void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata); + + /// + /// Commit log checkpoint (snapshot and fold-over) + /// + /// + /// + /// + void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata); + + /// + /// Retrieve commit metadata for specified index checkpoint + /// + /// Token + /// Metadata, or null if invalid + byte[] GetIndexCommitMetadata(Guid indexToken); + + /// + /// Retrieve commit metadata for specified log checkpoint + /// + /// Token + /// Metadata, or null if invalid + byte[] GetLogCommitMetadata(Guid logToken); + + /// + /// Provide device to store index checkpoint (including overflow buckets) + /// + /// + /// + IDevice GetIndexDevice(Guid indexToken); + + /// + /// Provide device to store snapshot of log (required only for snapshot checkpoints) + /// + /// + /// + IDevice GetSnapshotLogDevice(Guid token); + + /// + /// Provide device to store snapshot of object log (required only for snapshot checkpoints) + /// + /// + /// + IDevice GetSnapshotObjectLogDevice(Guid token); + + /// + /// Get latest valid checkpoint for recovery + /// + /// + /// + /// true if latest valid checkpoint found, false otherwise + bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken); + } +} \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/IndexCheckpoint.cs b/cs/src/core/Index/Recovery/IndexCheckpoint.cs similarity index 89% rename from cs/src/core/Index/FASTER/IndexCheckpoint.cs rename to cs/src/core/Index/Recovery/IndexCheckpoint.cs index 0ec78b3ad..b0d228f5f 100644 --- a/cs/src/core/Index/FASTER/IndexCheckpoint.cs +++ b/cs/src/core/Index/Recovery/IndexCheckpoint.cs @@ -28,9 +28,10 @@ internal void TakeIndexFuzzyCheckpoint() TakeMainIndexCheckpoint(ht_version, _indexCheckpoint.main_ht_device, out ulong ht_num_bytes_written); - overflowBucketsAllocator.TakeCheckpoint( - _indexCheckpoint.ofb_device, - out ulong ofb_num_bytes_written); + + var sectorSize = _indexCheckpoint.main_ht_device.SectorSize; + var alignedIndexSize = (uint)((ht_num_bytes_written + (sectorSize - 1)) & ~(sectorSize - 1)); + overflowBucketsAllocator.TakeCheckpoint(_indexCheckpoint.main_ht_device, alignedIndexSize, out ulong ofb_num_bytes_written); _indexCheckpoint.info.num_ht_bytes = ht_num_bytes_written; _indexCheckpoint.info.num_ofb_bytes = ofb_num_bytes_written; } @@ -40,7 +41,9 @@ internal void TakeIndexFuzzyCheckpoint(int ht_version, IDevice device, out ulong ofbnumBytesWritten, out int num_ofb_buckets) { TakeMainIndexCheckpoint(ht_version, device, out numBytesWritten); - overflowBucketsAllocator.TakeCheckpoint(ofbdevice, out ofbnumBytesWritten); + var sectorSize = device.SectorSize; + var alignedIndexSize = (uint)((numBytesWritten + (sectorSize - 1)) & ~(sectorSize - 1)); + overflowBucketsAllocator.TakeCheckpoint(ofbdevice, alignedIndexSize, out ofbnumBytesWritten); num_ofb_buckets = overflowBucketsAllocator.GetMaxValidAddress(); } diff --git a/cs/src/core/Index/FASTER/IndexRecovery.cs b/cs/src/core/Index/Recovery/IndexRecovery.cs similarity index 87% rename from cs/src/core/Index/FASTER/IndexRecovery.cs rename to cs/src/core/Index/Recovery/IndexRecovery.cs index fca7d230d..d1f0bb918 100644 --- a/cs/src/core/Index/FASTER/IndexRecovery.cs +++ b/cs/src/core/Index/Recovery/IndexRecovery.cs @@ -19,7 +19,7 @@ namespace FASTER.core /// public unsafe partial class FasterBase { - internal DirectoryConfiguration directoryConfiguration; + internal ICheckpointManager checkpointManager; // Derived class exposed API internal void RecoverFuzzyIndex(IndexCheckpointInfo info) @@ -29,24 +29,22 @@ internal void RecoverFuzzyIndex(IndexCheckpointInfo info) Debug.Assert(state[ht_version].size == info.info.table_size); // Create devices to read from using Async API - info.main_ht_device = Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(token), false); - info.ofb_device = Devices.CreateLogDevice(directoryConfiguration.GetOverflowBucketsFileName(token), false); + info.main_ht_device = checkpointManager.GetIndexDevice(token); BeginMainIndexRecovery(ht_version, info.main_ht_device, info.info.num_ht_bytes); - overflowBucketsAllocator.Recover( - info.ofb_device, - info.info.num_buckets, - info.info.num_ofb_bytes); + var sectorSize = info.main_ht_device.SectorSize; + var alignedIndexSize = (uint)((info.info.num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1)); + + overflowBucketsAllocator.Recover(info.main_ht_device, alignedIndexSize, info.info.num_buckets, info.info.num_ofb_bytes); // Wait until reading is complete IsFuzzyIndexRecoveryComplete(true); // close index checkpoint files appropriately info.main_ht_device.Close(); - info.ofb_device.Close(); // Delete all tentative entries! DeleteTentativeEntries(); @@ -55,7 +53,9 @@ internal void RecoverFuzzyIndex(IndexCheckpointInfo info) internal void RecoverFuzzyIndex(int ht_version, IDevice device, ulong num_ht_bytes, IDevice ofbdevice, int num_buckets, ulong num_ofb_bytes) { BeginMainIndexRecovery(ht_version, device, num_ht_bytes); - overflowBucketsAllocator.Recover(ofbdevice, num_buckets, num_ofb_bytes); + var sectorSize = device.SectorSize; + var alignedIndexSize = (uint)((num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1)); + overflowBucketsAllocator.Recover(ofbdevice, alignedIndexSize, num_buckets, num_ofb_bytes); } internal bool IsFuzzyIndexRecoveryComplete(bool waitUntilComplete = false) diff --git a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs new file mode 100644 index 000000000..9eb270d95 --- /dev/null +++ b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs @@ -0,0 +1,206 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Implementation of checkpoint interface for local file storage + /// + public class LocalCheckpointManager : ICheckpointManager + { + private DirectoryConfiguration directoryConfiguration; + + /// + /// Create new instance of local checkpoint manager at given base directory + /// + /// + public LocalCheckpointManager(string CheckpointDir) + { + directoryConfiguration = new DirectoryConfiguration(CheckpointDir); + } + + /// + /// Initialize index checkpoint + /// + /// + public void InitializeIndexCheckpoint(Guid indexToken) + { + directoryConfiguration.CreateIndexCheckpointFolder(indexToken); + } + + /// + /// Initialize log checkpoint (snapshot and fold-over) + /// + /// + public void InitializeLogCheckpoint(Guid logToken) + { + directoryConfiguration.CreateHybridLogCheckpointFolder(logToken); + } + + /// + /// Commit index checkpoint + /// + /// + /// + public void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata) + { + string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(indexToken); + using (var writer = new BinaryWriter(new FileStream(filename, FileMode.Create))) + { + writer.Write(commitMetadata.Length); + writer.Write(commitMetadata); + writer.Flush(); + } + + string completed_filename = directoryConfiguration.GetIndexCheckpointFolder(indexToken); + completed_filename += Path.DirectorySeparatorChar + "completed.dat"; + using (var file = new FileStream(completed_filename, FileMode.Create)) + { + file.Flush(); + } + } + + /// + /// Commit log checkpoint (snapshot and fold-over) + /// + /// + /// + public void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) + { + string filename = directoryConfiguration.GetHybridLogCheckpointMetaFileName(logToken); + using (var writer = new BinaryWriter(new FileStream(filename, FileMode.Create))) + { + writer.Write(commitMetadata.Length); + writer.Write(commitMetadata); + writer.Flush(); + } + + string completed_filename = directoryConfiguration.GetHybridLogCheckpointFolder(logToken); + completed_filename += Path.DirectorySeparatorChar + "completed.dat"; + using (var file = new FileStream(completed_filename, FileMode.Create)) + { + file.Flush(); + } + } + + /// + /// Retrieve commit metadata for specified index checkpoint + /// + /// Token + /// Metadata, or null if invalid + public byte[] GetIndexCommitMetadata(Guid indexToken) + { + var dir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder(indexToken)); + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + return null; + + string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(indexToken); + using (var reader = new BinaryReader(new FileStream(filename, FileMode.Open))) + { + var len = reader.ReadInt32(); + return reader.ReadBytes(len); + } + } + + /// + /// Retrieve commit metadata for specified log checkpoint + /// + /// Token + /// Metadata, or null if invalid + public byte[] GetLogCommitMetadata(Guid logToken) + { + var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(logToken)); + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + return null; + + string checkpointInfoFile = directoryConfiguration.GetHybridLogCheckpointMetaFileName(logToken); + using (var reader = new BinaryReader(new FileStream(checkpointInfoFile, FileMode.Open))) + { + var len = reader.ReadInt32(); + return reader.ReadBytes(len); + } + } + + /// + /// Provide device to store index checkpoint (including overflow buckets) + /// + /// + /// + public IDevice GetIndexDevice(Guid indexToken) + { + return Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(indexToken), false); + } + + /// + /// Provide device to store snapshot of log (required only for snapshot checkpoints) + /// + /// + /// + public IDevice GetSnapshotLogDevice(Guid token) + { + return Devices.CreateLogDevice(directoryConfiguration.GetLogSnapshotFileName(token), false); + } + + /// + /// Provide device to store snapshot of object log (required only for snapshot checkpoints) + /// + /// + /// + public IDevice GetSnapshotObjectLogDevice(Guid token) + { + return Devices.CreateLogDevice(directoryConfiguration.GetObjectLogSnapshotFileName(token), false); + } + + /// + /// Get latest valid checkpoint for recovery + /// + /// + /// + /// + public bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken) + { + var indexCheckpointDir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder()); + var dirs = indexCheckpointDir.GetDirectories(); + foreach (var dir in dirs) + { + // Remove incomplete checkpoints + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + { + Directory.Delete(dir.FullName, true); + } + } + var latestICFolder = indexCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First(); + if (latestICFolder == null || !Guid.TryParse(latestICFolder.Name, out indexToken)) + { + throw new Exception("No valid index checkpoint to recover from"); + } + + + var hlogCheckpointDir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder()); + dirs = hlogCheckpointDir.GetDirectories(); + foreach (var dir in dirs) + { + // Remove incomplete checkpoints + if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) + { + Directory.Delete(dir.FullName, true); + } + } + var latestHLCFolder = hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First(); + if (latestHLCFolder == null || !Guid.TryParse(latestHLCFolder.Name, out logToken)) + { + throw new Exception("No valid hybrid log checkpoint to recover from"); + } + return true; + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs similarity index 83% rename from cs/src/core/Index/FASTER/Recovery.cs rename to cs/src/core/Index/Recovery/Recovery.cs index 72cf67e53..81bf73da4 100644 --- a/cs/src/core/Index/FASTER/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -62,66 +62,10 @@ public unsafe partial class FasterKV f.LastWriteTime).First(); - if(latestICFolder == null || !Guid.TryParse(latestICFolder.Name, out Guid indexCheckpointGuid)) - { - throw new Exception("No valid index checkpoint to recover from"); - } - - - var hlogCheckpointDir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder()); - dirs = hlogCheckpointDir.GetDirectories(); - foreach (var dir in dirs) - { - // Remove incomplete checkpoints - if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat")) - { - Directory.Delete(dir.FullName, true); - } - } - var latestHLCFolder = hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First(); - if (latestHLCFolder == null || !Guid.TryParse(latestHLCFolder.Name, out Guid hybridLogCheckpointGuid)) - { - throw new Exception("No valid hybrid log checkpoint to recover from"); - } - + checkpointManager.GetLatestCheckpoint(out Guid indexCheckpointGuid, out Guid hybridLogCheckpointGuid); InternalRecover(indexCheckpointGuid, hybridLogCheckpointGuid); } - private bool IsCheckpointSafe(Guid token, CheckpointType checkpointType) - { - switch (checkpointType) - { - case CheckpointType.INDEX_ONLY: - { - var dir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder(token)); - return File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"); - } - case CheckpointType.HYBRID_LOG_ONLY: - { - var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(token)); - return File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"); - } - case CheckpointType.FULL: - { - return IsCheckpointSafe(token, CheckpointType.INDEX_ONLY) - && IsCheckpointSafe(token, CheckpointType.HYBRID_LOG_ONLY); - } - default: - return false; - } - } - private bool IsCompatible(IndexRecoveryInfo indexInfo, HybridLogRecoveryInfo recoveryInfo) { var l1 = indexInfo.finalLogicalAddress; @@ -135,20 +79,13 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) Debug.WriteLine("Index Checkpoint: {0}", indexToken); Debug.WriteLine("HybridLog Checkpoint: {0}", hybridLogToken); - // Assert corresponding checkpoints are safe to recover from - Debug.Assert(IsCheckpointSafe(indexToken, CheckpointType.INDEX_ONLY), - "Cannot recover from incomplete index checkpoint " + indexToken.ToString()); - - Debug.Assert(IsCheckpointSafe(hybridLogToken, CheckpointType.HYBRID_LOG_ONLY), - "Cannot recover from incomplete hybrid log checkpoint " + hybridLogToken.ToString()); - // Recovery appropriate context information var recoveredICInfo = new IndexCheckpointInfo(); - recoveredICInfo.Recover(indexToken, directoryConfiguration); + recoveredICInfo.Recover(indexToken, checkpointManager); recoveredICInfo.info.DebugPrint(); var recoveredHLCInfo = new HybridLogCheckpointInfo(); - recoveredHLCInfo.Recover(hybridLogToken, directoryConfiguration); + recoveredHLCInfo.Recover(hybridLogToken, checkpointManager); recoveredHLCInfo.info.DebugPrint(); // Check if the two checkpoints are compatible for recovery @@ -187,12 +124,7 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress); // Recover session information - _recoveredSessions = new SafeConcurrentDictionary(); - foreach(var sessionInfo in recoveredHLCInfo.info.continueTokens) - { - - _recoveredSessions.GetOrAdd(sessionInfo.Key, sessionInfo.Value); - } + _recoveredSessions = recoveredHLCInfo.info.continueTokens; } private void RestoreHybridLog(long untilAddress, long headAddress) @@ -338,8 +270,8 @@ private void RecoverHybridLogFromSnapshotFile( // By default first page has one extra record var capacity = hlog.GetCapacityNumPages(); - var recoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogCheckpointFileName(recoveryInfo.guid), false); - var objectLogRecoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogObjectCheckpointFileName(recoveryInfo.guid), false); + var recoveryDevice = checkpointManager.GetSnapshotLogDevice(recoveryInfo.guid); + var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(recoveryInfo.guid); recoveryDevice.Initialize(hlog.GetSegmentSize()); objectLogRecoveryDevice.Initialize(hlog.GetSegmentSize()); var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress) diff --git a/cs/test/FullRecoveryTests.cs b/cs/test/FullRecoveryTests.cs index 12d91aded..b07f129be 100644 --- a/cs/test/FullRecoveryTests.cs +++ b/cs/test/FullRecoveryTests.cs @@ -170,7 +170,7 @@ public void RecoverAndTest(Guid cprVersion, Guid indexVersion) // Test outputs var checkpointInfo = default(HybridLogRecoveryInfo); - checkpointInfo.Recover(cprVersion, new DirectoryConfiguration(test_path)); + checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path)); // Compute expected array long[] expected = new long[numUniqueKeys]; diff --git a/cs/test/ObjectRecoveryTest.cs b/cs/test/ObjectRecoveryTest.cs index 670afbce7..10287f21d 100644 --- a/cs/test/ObjectRecoveryTest.cs +++ b/cs/test/ObjectRecoveryTest.cs @@ -189,7 +189,7 @@ public unsafe void RecoverAndTest(Guid cprVersion, Guid indexVersion) // Test outputs var checkpointInfo = default(HybridLogRecoveryInfo); - checkpointInfo.Recover(cprVersion, new DirectoryConfiguration(test_path)); + checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path)); // Compute expected array long[] expected = new long[numUniqueKeys]; diff --git a/cs/test/SharedDirectoryTests.cs b/cs/test/SharedDirectoryTests.cs index 849956bce..7c4e5c9ee 100644 --- a/cs/test/SharedDirectoryTests.cs +++ b/cs/test/SharedDirectoryTests.cs @@ -153,7 +153,7 @@ private void Populate(FasterKV private void Test(FasterTestInstance fasterInstance, Guid checkpointToken) { var checkpointInfo = default(HybridLogRecoveryInfo); - Assert.IsTrue(checkpointInfo.Recover(checkpointToken, new DirectoryConfiguration(fasterInstance.CheckpointDirectory))); + checkpointInfo.Recover(checkpointToken, new LocalCheckpointManager(fasterInstance.CheckpointDirectory)); // Create array for reading var inputArray = new Input[numUniqueKeys];