diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 2abdd0a75..f7e119a9a 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -1,5 +1,6 @@
variables:
solution: 'cs/FASTER.sln'
+ RunAzureTests: 'yes'
jobs:
- job: 'csharpWindows'
@@ -37,6 +38,18 @@ jobs:
solution: '$(solution)'
platform: '$(buildPlatform)'
configuration: '$(buildConfiguration)'
+
+ - powershell: 'Invoke-WebRequest -OutFile azure-storage-emulator.msi -Uri "https://go.microsoft.com/fwlink/?LinkId=717179&clcid=0x409"'
+ displayName: 'Download Azure Storage Emulator'
+
+ - powershell: 'msiexec /passive /lvx installation.log /a azure-storage-emulator.msi TARGETDIR="C:\storage-emulator"'
+ displayName: 'Install Azure Storage Emulator'
+
+ - script: '"C:\Program Files\Microsoft SQL Server\130\Tools\Binn\SqlLocalDB.exe" create "v13.0" 13.0 -s'
+ displayName: 'Init Test Db'
+
+ - script: '"C:\storage-emulator\root\Microsoft SDKs\Azure\Storage Emulator\AzureStorageEmulator.exe" start'
+ displayName: 'Start Storage Emulator'
- task: VSTest@2
inputs:
diff --git a/cs/FASTER.sln b/cs/FASTER.sln
index a724440c6..af8a268e5 100644
--- a/cs/FASTER.sln
+++ b/cs/FASTER.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 15
-VisualStudioVersion = 15.0.27004.2008
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.29102.190
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.benchmark", "benchmark\FASTER.benchmark.csproj", "{33A732D1-2B58-4FEE-9696-B9483496229F}"
EndProject
@@ -12,6 +12,9 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmark", "benchmark", "{CA6AB459-A31A-4C15-B1A6-A82C349B54B4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{81B3B5D1-70F6-4979-AC76-003F9A6B316B}"
+ ProjectSection(SolutionItems) = preProject
+ src\core\FASTER.core.nuspec = src\core\FASTER.core.nuspec
+ EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SumStore", "playground\SumStore\SumStore.csproj", "{05D61B37-9714-4234-9961-384A63F7175E}"
EndProject
@@ -35,6 +38,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ClassCacheMT", "playground\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VarLenStructSample", "playground\VarLenStructSample\VarLenStructSample.csproj", "{37B3C501-A7A1-4E86-B766-22F9BEF31DFE}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FixedLenStructSample", "playground\FixedLenStructSample\FixedLenStructSample.csproj", "{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B14415-D316-4955-BE5F-725BB2DEBEBE}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -131,6 +140,22 @@ Global
{37B3C501-A7A1-4E86-B766-22F9BEF31DFE}.Release|Any CPU.Build.0 = Release|x64
{37B3C501-A7A1-4E86-B766-22F9BEF31DFE}.Release|x64.ActiveCfg = Release|x64
{37B3C501-A7A1-4E86-B766-22F9BEF31DFE}.Release|x64.Build.0 = Release|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Debug|Any CPU.ActiveCfg = Debug|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Debug|Any CPU.Build.0 = Debug|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Debug|x64.ActiveCfg = Debug|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Debug|x64.Build.0 = Debug|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|Any CPU.ActiveCfg = Release|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|Any CPU.Build.0 = Release|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|x64.ActiveCfg = Release|x64
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|x64.Build.0 = Release|x64
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|x64.ActiveCfg = Debug|x64
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|x64.Build.0 = Debug|x64
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64
+ {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -148,6 +173,9 @@ Global
{079F8DF4-96D4-41AC-AD04-308FDF70E371} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{F989FF23-5DD7-4D8F-9458-BDA22EFC038D} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{37B3C501-A7A1-4E86-B766-22F9BEF31DFE} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
+ {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
+ {A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
+ {E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
diff --git a/cs/playground/FixedLenStructSample/App.config b/cs/playground/FixedLenStructSample/App.config
new file mode 100644
index 000000000..d69a9b153
--- /dev/null
+++ b/cs/playground/FixedLenStructSample/App.config
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/cs/playground/FixedLenStructSample/FixedLenStructSample.csproj b/cs/playground/FixedLenStructSample/FixedLenStructSample.csproj
new file mode 100644
index 000000000..7b8c2eee1
--- /dev/null
+++ b/cs/playground/FixedLenStructSample/FixedLenStructSample.csproj
@@ -0,0 +1,38 @@
+
+
+
+ net46
+ x64
+ win7-x64
+
+
+
+ Exe
+ true
+ StructSample
+ prompt
+ PackageReference
+ true
+
+
+
+ TRACE;DEBUG
+ full
+ true
+ bin\x64\Debug\
+
+
+ TRACE
+ pdbonly
+ true
+ bin\x64\Release\
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/cs/playground/FixedLenStructSample/Functions.cs b/cs/playground/FixedLenStructSample/Functions.cs
new file mode 100644
index 000000000..89c008643
--- /dev/null
+++ b/cs/playground/FixedLenStructSample/Functions.cs
@@ -0,0 +1,66 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using FASTER.core;
+using System;
+
+namespace FixedLenStructSample
+{
+ ///
+ /// Callback functions for FASTER operations
+ ///
+ public class FixedLenFunctions : IFunctions
+ {
+ public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
+ {
+ }
+
+ public void ConcurrentReader(ref FixedLenKey key, ref string input, ref FixedLenValue value, ref string dst)
+ {
+ dst = value.ToString();
+ }
+
+ public void ConcurrentWriter(ref FixedLenKey key, ref FixedLenValue src, ref FixedLenValue dst)
+ {
+ src.CopyTo(ref dst);
+ }
+
+ public void CopyUpdater(ref FixedLenKey key, ref string input, ref FixedLenValue oldValue, ref FixedLenValue newValue)
+ {
+ }
+
+ public void DeleteCompletionCallback(ref FixedLenKey key, Empty ctx)
+ {
+ }
+
+ public void InitialUpdater(ref FixedLenKey key, ref string input, ref FixedLenValue value)
+ {
+ }
+
+ public void InPlaceUpdater(ref FixedLenKey key, ref string input, ref FixedLenValue value)
+ {
+ }
+
+ public void ReadCompletionCallback(ref FixedLenKey key, ref string input, ref string output, Empty ctx, Status status)
+ {
+ }
+
+ public void RMWCompletionCallback(ref FixedLenKey key, ref string input, Empty ctx, Status status)
+ {
+ }
+
+ public void SingleReader(ref FixedLenKey key, ref string input, ref FixedLenValue value, ref string dst)
+ {
+ dst = value.ToString();
+ }
+
+ public void SingleWriter(ref FixedLenKey key, ref FixedLenValue src, ref FixedLenValue dst)
+ {
+ src.CopyTo(ref dst);
+ }
+
+ public void UpsertCompletionCallback(ref FixedLenKey key, ref FixedLenValue value, Empty ctx)
+ {
+ }
+ }
+}
diff --git a/cs/playground/FixedLenStructSample/Program.cs b/cs/playground/FixedLenStructSample/Program.cs
new file mode 100644
index 000000000..a78a531aa
--- /dev/null
+++ b/cs/playground/FixedLenStructSample/Program.cs
@@ -0,0 +1,57 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using FASTER.core;
+using System;
+
+namespace FixedLenStructSample
+{
+ public class Program
+ {
+ // This sample uses fixed length structs for keys and values
+ static void Main()
+ {
+ var log = Devices.CreateLogDevice("hlog.log", deleteOnClose: true);
+ var fht = new FasterKV
+ (128, new FixedLenFunctions(),
+ new LogSettings { LogDevice = log, MemorySizeBits = 17, PageSizeBits = 12 }
+ );
+ fht.StartSession();
+
+ var key = new FixedLenKey("foo");
+ var value = new FixedLenValue("bar");
+
+ var status = fht.Upsert(ref key, ref value, Empty.Default, 0);
+
+ if (status != Status.OK)
+ Console.WriteLine("FixedLenStructSample: Error!");
+
+ var input = default(string); // unused
+ var output = default(string);
+
+ key = new FixedLenKey("xyz");
+ status = fht.Read(ref key, ref input, ref output, Empty.Default, 0);
+
+ if (status != Status.NOTFOUND)
+ Console.WriteLine("FixedLenStructSample: Error!");
+
+ key = new FixedLenKey("foo");
+ status = fht.Read(ref key, ref input, ref output, Empty.Default, 0);
+
+ if (status != Status.OK)
+ Console.WriteLine("FixedLenStructSample: Error!");
+
+ if (output.Equals(value.ToString()))
+ Console.WriteLine("FixedLenStructSample: Success!");
+ else
+ Console.WriteLine("FixedLenStructSample: Error!");
+
+ fht.StopSession();
+ fht.Dispose();
+ log.Close();
+
+ Console.WriteLine("Press to end");
+ Console.ReadLine();
+ }
+ }
+}
diff --git a/cs/playground/FixedLenStructSample/Properties/AssemblyInfo.cs b/cs/playground/FixedLenStructSample/Properties/AssemblyInfo.cs
new file mode 100644
index 000000000..1ac797463
--- /dev/null
+++ b/cs/playground/FixedLenStructSample/Properties/AssemblyInfo.cs
@@ -0,0 +1,21 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyCopyright("Copyright © 2019")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")]
diff --git a/cs/playground/FixedLenStructSample/Types.cs b/cs/playground/FixedLenStructSample/Types.cs
new file mode 100644
index 000000000..eaad002d2
--- /dev/null
+++ b/cs/playground/FixedLenStructSample/Types.cs
@@ -0,0 +1,137 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using FASTER.core;
+using System;
+using System.Runtime.InteropServices;
+
+namespace FixedLenStructSample
+{
+ ///
+ /// Represents a fixed length struct type (Length = 32)
+ ///
+ [StructLayout(LayoutKind.Explicit, Size = Length)]
+ public unsafe struct FixedLenKey : IFasterEqualityComparer
+ {
+ private const int Length = 32;
+
+ [FieldOffset(0)]
+ private byte data;
+
+ public FixedLenKey(string v)
+ {
+ if (Length < 2 * (v.Length + 1))
+ throw new Exception("Insufficient space to store string");
+
+ fixed (byte* ptr = &data)
+ {
+ var data = (char*)ptr;
+ for (var i = 0; i < Length / sizeof(char); i++)
+ {
+ if (i < v.Length)
+ *(data + i) = v[i];
+ else
+ *(data + i) = '\0';
+ }
+ }
+ }
+
+ public void CopyTo(ref FixedLenKey dst)
+ {
+ fixed (byte* source = &data, destination = &dst.data)
+ Buffer.MemoryCopy(source, destination, Length, Length);
+ }
+
+ public bool Equals(ref FixedLenKey k1, ref FixedLenKey k2)
+ {
+ fixed (byte* pk1 = &k1.data, pk2 = &k2.data)
+ {
+ for (var i = 0; i < Length; i++)
+ {
+ var left = *(pk1 + i);
+ var right = *(pk2 + i);
+ if (left != right)
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public long GetHashCode64(ref FixedLenKey k)
+ {
+ fixed (byte* data = &k.data)
+ return Utility.HashBytes(data, Length);
+ }
+
+ public override string ToString()
+ {
+ fixed (byte* ptr = &data)
+ return new string((char*)ptr);
+ }
+ }
+
+ ///
+ /// Represents a fixed length struct type (Length = 64)
+ ///
+ [StructLayout(LayoutKind.Explicit, Size = Length)]
+ public unsafe struct FixedLenValue : IFasterEqualityComparer
+ {
+ private const int Length = 64;
+
+ [FieldOffset(0)]
+ private byte data;
+
+ public FixedLenValue(string v)
+ {
+ if (Length < 2 * (v.Length + 1))
+ throw new Exception("Insufficient space to store string");
+
+ fixed (byte* ptr = &data)
+ {
+ var data = (char*)ptr;
+ for (var i = 0; i < Length / sizeof(char); i++)
+ {
+ if (i < v.Length)
+ *(data + i) = v[i];
+ else
+ *(data + i) = '\0';
+ }
+ }
+ }
+
+ public void CopyTo(ref FixedLenValue dst)
+ {
+ fixed (byte* source = &data, destination = &dst.data)
+ Buffer.MemoryCopy(source, destination, Length, Length);
+ }
+
+ public bool Equals(ref FixedLenValue k1, ref FixedLenValue k2)
+ {
+ fixed (byte* pk1 = &k1.data, pk2 = &k2.data)
+ {
+ for (var i = 0; i < Length; i++)
+ {
+ var left = *(pk1 + i);
+ var right = *(pk2 + i);
+ if (left != right)
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public long GetHashCode64(ref FixedLenValue k)
+ {
+ fixed (byte* data = &k.data)
+ return Utility.HashBytes(data, Length);
+ }
+
+ public override string ToString()
+ {
+ fixed (byte* ptr = &data)
+ return new string((char*)ptr);
+ }
+ }
+}
diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs
index 558ac6f52..f69fdec03 100644
--- a/cs/src/core/Allocator/AllocatorBase.cs
+++ b/cs/src/core/Allocator/AllocatorBase.cs
@@ -329,7 +329,7 @@ public unsafe abstract class AllocatorBase : IDisposable
/// Allocate page
///
///
- protected abstract void AllocatePage(int index);
+ internal abstract void AllocatePage(int index);
///
/// Whether page is allocated
///
diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs
index 85736465b..ef1a9a773 100644
--- a/cs/src/core/Allocator/BlittableAllocator.cs
+++ b/cs/src/core/Allocator/BlittableAllocator.cs
@@ -121,7 +121,7 @@ public override void Dispose()
/// Allocate memory page, pinned in memory, and in sector aligned form, if possible
///
///
- protected override void AllocatePage(int index)
+ internal override void AllocatePage(int index)
{
var adjustedSize = PageSize + 2 * sectorSize;
byte[] tmp = new byte[adjustedSize];
diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs
index 3e8ac26c0..e5822429f 100644
--- a/cs/src/core/Allocator/GenericAllocator.cs
+++ b/cs/src/core/Allocator/GenericAllocator.cs
@@ -197,7 +197,7 @@ internal override void DeleteFromMemory()
/// Allocate memory page, pinned in memory, and in sector aligned form, if possible
///
///
- protected override void AllocatePage(int index)
+ internal override void AllocatePage(int index)
{
values[index] = AllocatePage();
PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed;
@@ -312,6 +312,8 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres
}
fixed (RecordInfo* pin = &src[0].info)
{
+ Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length);
+
Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.aligned_pointer + start,
numBytesToWrite - start, numBytesToWrite - start);
}
@@ -320,6 +322,8 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres
{
fixed (RecordInfo* pin = &src[0].info)
{
+ Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)buffer.handle.AddrOfPinnedObject() + buffer.buffer.Length);
+
Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.aligned_pointer + aligned_start,
numBytesToWrite - aligned_start, numBytesToWrite - aligned_start);
}
@@ -494,7 +498,7 @@ private void AsyncFlushPartialObjectLogCallback(uint errorCode, uint n
{
if (errorCode != 0)
{
- Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
+ Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}
// Set the page status to flushed
@@ -513,26 +517,17 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num
PageAsyncReadResult result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult;
- var src = values[result.page % BufferSize];
+ Record[] src;
// We are reading into a frame
if (result.frame != null)
{
var frame = (GenericFrame)result.frame;
src = frame.GetPage(result.page % frame.frameSize);
-
- if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0)
- {
- PopulatePageFrame(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, src);
- }
}
else
- {
- if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0)
- {
- PopulatePage(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, result.page);
- }
- }
+ src = values[result.page % BufferSize];
+
// Deserialize all objects until untilptr
if (result.resumePtr < result.untilPtr)
@@ -560,9 +555,8 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num
// We will be re-issuing I/O, so free current overlap
Overlapped.Free(overlap);
- // Compute new untilPtr
// We will now be able to process all records until (but not including) untilPtr
- GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, src, out long startptr, out long size);
+ GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, out long startptr, out long size);
// Object log fragment should be aligned by construction
Debug.Assert(startptr % sectorSize == 0);
@@ -570,9 +564,9 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num
if (size > int.MaxValue)
throw new Exception("Unable to read object page, total size greater than 2GB: " + size);
- var objBuffer = bufferPool.Get((int)size);
- result.freeBuffer2 = objBuffer;
var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1);
+ var objBuffer = bufferPool.Get((int)alignedLength);
+ result.freeBuffer2 = objBuffer;
// Request objects from objlog
result.objlogDevice.ReadAsync(
@@ -718,7 +712,10 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[]
while (ptr < untilptr)
{
- if (!src[ptr / recordSize].info.Invalid)
+ ref Record record = ref Unsafe.AsRef>(raw + ptr);
+ src[ptr / recordSize].info = record.info;
+
+ if (!record.info.Invalid)
{
if (KeyHasObjects())
{
@@ -729,21 +726,32 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[]
stream.Seek(streamStartPos + key_addr->Address - start_addr, SeekOrigin.Begin);
}
- src[ptr/recordSize].key = new Key();
+ src[ptr / recordSize].key = new Key();
keySerializer.Deserialize(ref src[ptr/recordSize].key);
- }
+ }
+ else
+ {
+ src[ptr / recordSize].key = record.key;
+ }
- if (ValueHasObjects() && !src[ptr / recordSize].info.Tombstone)
+ if (!record.info.Tombstone)
{
- var value_addr = GetValueAddressInfo((long)raw + ptr);
- if (start_addr == -1) start_addr = value_addr->Address;
- if (stream.Position != streamStartPos + value_addr->Address - start_addr)
+ if (ValueHasObjects())
{
- stream.Seek(streamStartPos + value_addr->Address - start_addr, SeekOrigin.Begin);
+ var value_addr = GetValueAddressInfo((long)raw + ptr);
+ if (start_addr == -1) start_addr = value_addr->Address;
+ if (stream.Position != streamStartPos + value_addr->Address - start_addr)
+ {
+ stream.Seek(streamStartPos + value_addr->Address - start_addr, SeekOrigin.Begin);
+ }
+
+ src[ptr / recordSize].value = new Value();
+ valueSerializer.Deserialize(ref src[ptr / recordSize].value);
+ }
+ else
+ {
+ src[ptr / recordSize].value = record.value;
}
-
- src[ptr / recordSize].value = new Value();
- valueSerializer.Deserialize(ref src[ptr/recordSize].value);
}
}
ptr += GetRecordSize(ptr);
@@ -765,17 +773,18 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[]
///
///
///
- ///
///
///
- public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBlockSize, Record[] src, out long startptr, out long size)
+ public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBlockSize, out long startptr, out long size)
{
long minObjAddress = long.MaxValue;
long maxObjAddress = long.MinValue;
while (ptr < untilptr)
{
- if (!src[ptr/recordSize].info.Invalid)
+ ref Record record = ref Unsafe.AsRef>(raw + ptr);
+
+ if (!record.info.Invalid)
{
if (KeyHasObjects())
{
@@ -794,7 +803,7 @@ public void GetObjectInfo(byte* raw, ref long ptr, long untilptr, int objectBloc
}
- if (ValueHasObjects() && !src[ptr / recordSize].info.Tombstone)
+ if (ValueHasObjects() && !record.info.Tombstone)
{
var value_addr = GetValueAddressInfo((long)raw + ptr);
var addr = value_addr->Address;
@@ -941,6 +950,8 @@ internal void PopulatePage(byte* src, int required_bytes, ref Record
{
fixed (RecordInfo* pin = &destinationPage[0].info)
{
+ Debug.Assert(required_bytes <= recordSize * destinationPage.Length);
+
Buffer.MemoryCopy(src, Unsafe.AsPointer(ref destinationPage[0]), required_bytes, required_bytes);
}
}
diff --git a/cs/src/core/Allocator/MallocFixedPageSize.cs b/cs/src/core/Allocator/MallocFixedPageSize.cs
index 8c3bc60ae..bca812b17 100644
--- a/cs/src/core/Allocator/MallocFixedPageSize.cs
+++ b/cs/src/core/Allocator/MallocFixedPageSize.cs
@@ -422,10 +422,11 @@ public void Dispose()
/// Public facing persistence API
///
///
+ ///
///
- public void TakeCheckpoint(IDevice device, out ulong numBytes)
+ public void TakeCheckpoint(IDevice device, ulong start_offset, out ulong numBytes)
{
- BeginCheckpoint(device, 0UL, out numBytes);
+ BeginCheckpoint(device, start_offset, out numBytes);
}
///
@@ -515,9 +516,10 @@ public int GetPageSize()
///
///
///
- public void Recover(IDevice device, int buckets, ulong numBytes)
+ ///
+ public void Recover(IDevice device, ulong offset, int buckets, ulong numBytes)
{
- BeginRecovery(device, 0UL, buckets, numBytes, out ulong numBytesRead);
+ BeginRecovery(device, offset, buckets, numBytes, out ulong numBytesRead);
}
///
diff --git a/cs/src/core/Allocator/VarLenBlittableAllocator.cs b/cs/src/core/Allocator/VarLenBlittableAllocator.cs
index 353409c27..3abd21c70 100644
--- a/cs/src/core/Allocator/VarLenBlittableAllocator.cs
+++ b/cs/src/core/Allocator/VarLenBlittableAllocator.cs
@@ -193,7 +193,7 @@ public override void Dispose()
/// Allocate memory page, pinned in memory, and in sector aligned form, if possible
///
///
- protected override void AllocatePage(int index)
+ internal override void AllocatePage(int index)
{
var adjustedSize = PageSize + 2 * sectorSize;
byte[] tmp = new byte[adjustedSize];
diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs
index 02b51679e..9428fd848 100644
--- a/cs/src/core/Device/Devices.cs
+++ b/cs/src/core/Device/Devices.cs
@@ -8,8 +8,6 @@
namespace FASTER.core
{
-
-
///
/// Factory to create FASTER objects
///
diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs
index dcbde31fc..1680e4a22 100644
--- a/cs/src/core/Device/LocalStorageDevice.cs
+++ b/cs/src/core/Device/LocalStorageDevice.cs
@@ -18,7 +18,8 @@ public class LocalStorageDevice : StorageDeviceBase
{
private readonly bool preallocateFile;
private readonly bool deleteOnClose;
- private readonly ConcurrentDictionary logHandles;
+ private readonly bool disableFileBuffering;
+ private readonly SafeConcurrentDictionary logHandles;
///
/// Constructor
@@ -26,13 +27,15 @@ public class LocalStorageDevice : StorageDeviceBase
///
///
///
- public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false)
+ ///
+ public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true)
: base(filename, GetSectorSize(filename))
{
Native32.EnableProcessPrivileges();
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
- logHandles = new ConcurrentDictionary();
+ this.disableFileBuffering = disableFileBuffering;
+ logHandles = new SafeConcurrentDictionary();
}
///
@@ -143,11 +146,21 @@ public override void Close()
logHandle.Dispose();
}
+ ///
+ ///
+ ///
+ ///
+ ///
protected string GetSegmentName(int segmentId)
{
return FileName + "." + segmentId;
}
+ ///
+ ///
+ ///
+ ///
+ ///
// Can be used to pre-load handles, e.g., after a checkpoint
protected SafeFileHandle GetOrAddHandle(int _segmentId)
{
@@ -175,7 +188,11 @@ private SafeFileHandle CreateHandle(int segmentId)
uint fileCreation = unchecked((uint)FileMode.OpenOrCreate);
uint fileFlags = Native32.FILE_FLAG_OVERLAPPED;
- fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING;
+ if (this.disableFileBuffering)
+ {
+ fileFlags = fileFlags | Native32.FILE_FLAG_NO_BUFFERING;
+ }
+
if (deleteOnClose)
{
fileFlags = fileFlags | Native32.FILE_FLAG_DELETE_ON_CLOSE;
diff --git a/cs/src/core/FASTER.core.csproj b/cs/src/core/FASTER.core.csproj
index c2dbeca27..1d68a5425 100644
--- a/cs/src/core/FASTER.core.csproj
+++ b/cs/src/core/FASTER.core.csproj
@@ -37,18 +37,4 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/cs/src/core/FASTER.core.debug.nuspec b/cs/src/core/FASTER.core.debug.nuspec
index c9bea5301..bf8cfaeca 100644
--- a/cs/src/core/FASTER.core.debug.nuspec
+++ b/cs/src/core/FASTER.core.debug.nuspec
@@ -7,7 +7,7 @@
Microsoft
Microsoft
https://github.com/Microsoft/FASTER
- https://raw.githubusercontent.com/Microsoft/FASTER/master/LICENSE
+ MIT
true
Debug version. FASTER is a fast concurrent key-value store that also supports indexing of larger-than-memory data
See the project website at https://github.com/Microsoft/FASTER for more details
diff --git a/cs/src/core/FASTER.core.nuspec b/cs/src/core/FASTER.core.nuspec
index bd9db8180..a73b87661 100644
--- a/cs/src/core/FASTER.core.nuspec
+++ b/cs/src/core/FASTER.core.nuspec
@@ -7,7 +7,7 @@
Microsoft
Microsoft
https://github.com/Microsoft/FASTER
- https://raw.githubusercontent.com/Microsoft/FASTER/master/LICENSE
+ MIT
true
FASTER is a fast concurrent key-value store that also supports indexing of larger-than-memory data
See the project website at https://github.com/Microsoft/FASTER for more details
diff --git a/cs/src/core/Index/Common/AddressInfo.cs b/cs/src/core/Index/Common/AddressInfo.cs
index 185305848..a9a78a962 100644
--- a/cs/src/core/Index/Common/AddressInfo.cs
+++ b/cs/src/core/Index/Common/AddressInfo.cs
@@ -87,7 +87,7 @@ public long Address
word = (IntPtr)_word;
if (value != Address)
{
- throw new Exception("Overflow in AddressInfo");
+ throw new Exception("Overflow in AddressInfo" + ((kAddressBits < 64) ? " - consider running the program in x64 mode for larger address space support" : ""));
}
}
}
diff --git a/cs/src/core/Index/Common/CheckpointSettings.cs b/cs/src/core/Index/Common/CheckpointSettings.cs
index ba76f5016..6e20f6dbb 100644
--- a/cs/src/core/Index/Common/CheckpointSettings.cs
+++ b/cs/src/core/Index/Common/CheckpointSettings.cs
@@ -2,6 +2,8 @@
// Licensed under the MIT license.
+using System;
+
namespace FASTER.core
{
///
@@ -27,13 +29,20 @@ public enum CheckpointType
public class CheckpointSettings
{
///
- /// Directory where checkpoints are stored
+ /// Checkpoint manager
///
- public string CheckpointDir = "";
+ public ICheckpointManager CheckpointManager = null;
///
/// Type of checkpoint
///
public CheckpointType CheckPointType = CheckpointType.Snapshot;
+
+ ///
+ /// Use specified directory for storing and retrieving checkpoints
+ /// This is a shortcut to providing the following:
+ /// CheckpointSettings.CheckpointManager = new LocalCheckpointManager(CheckpointDir)
+ ///
+ public string CheckpointDir = null;
}
}
diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs
index 5d69fbdc7..91910ea9f 100644
--- a/cs/src/core/Index/Common/Contexts.cs
+++ b/cs/src/core/Index/Common/Contexts.cs
@@ -106,139 +106,7 @@ internal class FasterExecutionContext : SerializedFasterExecutionContext
}
}
- internal class DirectoryConfiguration
- {
- private readonly string checkpointDir;
- public DirectoryConfiguration(string checkpointDir)
- {
- this.checkpointDir = checkpointDir;
- }
-
- public const string index_base_folder = "index-checkpoints";
- public const string index_meta_file = "info";
- public const string hash_table_file = "ht";
- public const string overflow_buckets_file = "ofb";
- public const string snapshot_file = "snapshot";
-
- public const string cpr_base_folder = "cpr-checkpoints";
- public const string cpr_meta_file = "info";
-
- public void CreateIndexCheckpointFolder(Guid token)
- {
- var directory = GetIndexCheckpointFolder(token);
- Directory.CreateDirectory(directory);
- DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory);
- foreach (System.IO.FileInfo file in directoryInfo.GetFiles())
- file.Delete();
- }
- public void CreateHybridLogCheckpointFolder(Guid token)
- {
- var directory = GetHybridLogCheckpointFolder(token);
- Directory.CreateDirectory(directory);
- DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory);
- foreach (System.IO.FileInfo file in directoryInfo.GetFiles())
- file.Delete();
- }
-
- public string GetIndexCheckpointFolder(Guid token = default(Guid))
- {
- if (token != default(Guid))
- return GetMergedFolderPath(checkpointDir, index_base_folder, token.ToString());
- else
- return GetMergedFolderPath(checkpointDir, index_base_folder);
- }
-
- public string GetHybridLogCheckpointFolder(Guid token = default(Guid))
- {
- if (token != default(Guid))
- return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString());
- else
- return GetMergedFolderPath(checkpointDir, cpr_base_folder);
- }
-
- public string GetIndexCheckpointMetaFileName(Guid token)
- {
- return GetMergedFolderPath(checkpointDir,
- index_base_folder,
- token.ToString(),
- index_meta_file,
- ".dat");
- }
-
- public string GetPrimaryHashTableFileName(Guid token)
- {
- return GetMergedFolderPath(checkpointDir,
- index_base_folder,
- token.ToString(),
- hash_table_file,
- ".dat");
- }
-
- public string GetOverflowBucketsFileName(Guid token)
- {
- return GetMergedFolderPath(checkpointDir,
- index_base_folder,
- token.ToString(),
- overflow_buckets_file,
- ".dat");
- }
-
- public string GetHybridLogCheckpointMetaFileName(Guid token)
- {
- return GetMergedFolderPath(checkpointDir,
- cpr_base_folder,
- token.ToString(),
- cpr_meta_file,
- ".dat");
- }
-
- public string GetHybridLogCheckpointContextFileName(Guid checkpointToken, Guid sessionToken)
- {
- return GetMergedFolderPath(checkpointDir,
- cpr_base_folder,
- checkpointToken.ToString(),
- sessionToken.ToString(),
- ".dat");
- }
-
- public string GetHybridLogCheckpointFileName(Guid token)
- {
- return GetMergedFolderPath(checkpointDir,
- cpr_base_folder,
- token.ToString(),
- snapshot_file,
- ".dat");
- }
-
- public string GetHybridLogObjectCheckpointFileName(Guid token)
- {
- return GetMergedFolderPath(checkpointDir,
- cpr_base_folder,
- token.ToString(),
- snapshot_file,
- ".obj.dat");
- }
-
- public static string GetMergedFolderPath(params String[] paths)
- {
- String fullPath = paths[0];
-
- for (int i = 1; i < paths.Length; i++)
- {
- if (i == paths.Length - 1 && paths[i].Contains("."))
- {
- fullPath += paths[i];
- }
- else
- {
- fullPath += Path.DirectorySeparatorChar + paths[i];
- }
- }
-
- return fullPath;
- }
- }
-
+
///
/// Recovery info for hybrid log
///
@@ -273,13 +141,24 @@ public struct HybridLogRecoveryInfo
///
public long finalLogicalAddress;
///
+ /// Head address
+ ///
+ public long headAddress;
+ ///
/// Guid array
///
public Guid[] guids;
+
///
- /// Tokens per guid
+ /// Tokens per guid restored during Continue
///
- public Dictionary continueTokens;
+ public ConcurrentDictionary continueTokens;
+
+ ///
+ /// Tokens per guid created during Checkpoint
+ ///
+ public ConcurrentDictionary checkpointTokens;
+
///
/// Object log segment offsets
///
@@ -299,8 +178,10 @@ public void Initialize(Guid token, int _version)
flushedLogicalAddress = 0;
startLogicalAddress = 0;
finalLogicalAddress = 0;
+ headAddress = 0;
guids = new Guid[LightEpoch.kTableSize + 1];
- continueTokens = new Dictionary();
+ continueTokens = new ConcurrentDictionary();
+ checkpointTokens = new ConcurrentDictionary();
objectLogSegmentOffsets = null;
}
@@ -311,7 +192,7 @@ public void Initialize(Guid token, int _version)
public void Initialize(StreamReader reader)
{
guids = new Guid[LightEpoch.kTableSize + 1];
- continueTokens = new Dictionary();
+ continueTokens = new ConcurrentDictionary();
string value = reader.ReadLine();
guid = Guid.Parse(value);
@@ -331,6 +212,9 @@ public void Initialize(StreamReader reader)
value = reader.ReadLine();
finalLogicalAddress = long.Parse(value);
+ value = reader.ReadLine();
+ headAddress = long.Parse(value);
+
value = reader.ReadLine();
numThreads = int.Parse(value);
@@ -338,6 +222,9 @@ public void Initialize(StreamReader reader)
{
value = reader.ReadLine();
guids[i] = Guid.Parse(value);
+ value = reader.ReadLine();
+ var serialno = long.Parse(value);
+ continueTokens.TryAdd(guids[i], serialno);
}
// Read object log segment offsets
@@ -354,51 +241,19 @@ public void Initialize(StreamReader reader)
}
}
- ///
- /// Recover info from token and checkpoint directory
- ///
- ///
- ///
- ///
- public bool Recover(Guid token, string checkpointDir)
- {
- return Recover(token, new DirectoryConfiguration(checkpointDir));
- }
-
///
/// Recover info from token
///
///
- ///
+ ///
///
- internal bool Recover(Guid token, DirectoryConfiguration directoryConfiguration)
+ internal void Recover(Guid token, ICheckpointManager checkpointManager)
{
- string checkpointInfoFile = directoryConfiguration.GetHybridLogCheckpointMetaFileName(token);
- using (var reader = new StreamReader(checkpointInfoFile))
- {
- Initialize(reader);
- }
+ var metadata = checkpointManager.GetLogCommitMetadata(token);
+ if (metadata == null)
+ throw new Exception("Invalid log commit metadata for ID " + token.ToString());
- int num_threads = numThreads;
- for (int i = 0; i < num_threads; i++)
- {
- var guid = guids[i];
- using (var reader = new StreamReader(directoryConfiguration.GetHybridLogCheckpointContextFileName(token, guid)))
- {
- var ctx = new SerializedFasterExecutionContext();
- ctx.Load(reader);
- continueTokens.Add(ctx.guid, ctx.serialNum);
- }
- }
-
- if (continueTokens.Count == num_threads)
- {
- return true;
- }
- else
- {
- return false;
- }
+ Initialize(new StreamReader(new MemoryStream(metadata)));
}
///
@@ -410,31 +265,39 @@ public void Reset()
}
///
- /// Write info to file
+ /// Write info to byte array
///
- ///
- public void Write(StreamWriter writer)
+ public byte[] ToByteArray()
{
- writer.WriteLine(guid);
- writer.WriteLine(useSnapshotFile);
- writer.WriteLine(version);
- writer.WriteLine(flushedLogicalAddress);
- writer.WriteLine(startLogicalAddress);
- writer.WriteLine(finalLogicalAddress);
- writer.WriteLine(numThreads);
- for (int i = 0; i < numThreads; i++)
- {
- writer.WriteLine(guids[i]);
- }
-
- //Write object log segment offsets
- writer.WriteLine(objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length);
- if (objectLogSegmentOffsets != null)
+ using (var ms = new MemoryStream())
{
- for (int i = 0; i < objectLogSegmentOffsets.Length; i++)
+ using (StreamWriter writer = new StreamWriter(ms))
{
- writer.WriteLine(objectLogSegmentOffsets[i]);
+ writer.WriteLine(guid);
+ writer.WriteLine(useSnapshotFile);
+ writer.WriteLine(version);
+ writer.WriteLine(flushedLogicalAddress);
+ writer.WriteLine(startLogicalAddress);
+ writer.WriteLine(finalLogicalAddress);
+ writer.WriteLine(headAddress);
+ writer.WriteLine(numThreads);
+ for (int i = 0; i < numThreads; i++)
+ {
+ writer.WriteLine(guids[i]);
+ writer.WriteLine(checkpointTokens[guids[i]]);
+ }
+
+ // Write object log segment offsets
+ writer.WriteLine(objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length);
+ if (objectLogSegmentOffsets != null)
+ {
+ for (int i = 0; i < objectLogSegmentOffsets.Length; i++)
+ {
+ writer.WriteLine(objectLogSegmentOffsets[i]);
+ }
+ }
}
+ return ms.ToArray();
}
}
@@ -449,6 +312,7 @@ public void DebugPrint()
Debug.WriteLine("Flushed LogicalAddress: {0}", flushedLogicalAddress);
Debug.WriteLine("Start Logical Address: {0}", startLogicalAddress);
Debug.WriteLine("Final Logical Address: {0}", finalLogicalAddress);
+ Debug.WriteLine("Head Address: {0}", headAddress);
Debug.WriteLine("Num sessions recovered: {0}", numThreads);
Debug.WriteLine("Recovered sessions: ");
foreach (var sessionInfo in continueTokens)
@@ -466,16 +330,19 @@ internal struct HybridLogCheckpointInfo
public CountdownEvent flushed;
public long started;
- public void Initialize(Guid token, int _version)
+ public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager)
{
info.Initialize(token, _version);
started = 0;
+ checkpointManager.InitializeLogCheckpoint(token);
}
- public void Recover(Guid token, DirectoryConfiguration directoryConfiguration)
+
+ public void Recover(Guid token, ICheckpointManager checkpointManager)
{
- info.Recover(token, directoryConfiguration);
+ info.Recover(token, checkpointManager);
started = 0;
}
+
public void Reset()
{
started = 0;
@@ -529,24 +396,34 @@ public void Initialize(StreamReader reader)
value = reader.ReadLine();
finalLogicalAddress = long.Parse(value);
}
- public void Recover(Guid guid, DirectoryConfiguration directoryConfiguration)
+
+ public void Recover(Guid guid, ICheckpointManager checkpointManager)
{
- string indexInfoFile = directoryConfiguration.GetIndexCheckpointMetaFileName(guid);
- using (var reader = new StreamReader(indexInfoFile))
- {
- Initialize(reader);
- }
+ var metadata = checkpointManager.GetIndexCommitMetadata(guid);
+ if (metadata == null)
+ throw new Exception("Invalid index commit metadata for ID " + guid.ToString());
+ Initialize(new StreamReader(new MemoryStream(metadata)));
}
- public void Write(StreamWriter writer)
+
+ public byte[] ToByteArray()
{
- writer.WriteLine(token);
- writer.WriteLine(table_size);
- writer.WriteLine(num_ht_bytes);
- writer.WriteLine(num_ofb_bytes);
- writer.WriteLine(num_buckets);
- writer.WriteLine(startLogicalAddress);
- writer.WriteLine(finalLogicalAddress);
+ using (var ms = new MemoryStream())
+ {
+ using (var writer = new StreamWriter(ms))
+ {
+
+ writer.WriteLine(token);
+ writer.WriteLine(table_size);
+ writer.WriteLine(num_ht_bytes);
+ writer.WriteLine(num_ofb_bytes);
+ writer.WriteLine(num_buckets);
+ writer.WriteLine(startLogicalAddress);
+ writer.WriteLine(finalLogicalAddress);
+ }
+ return ms.ToArray();
+ }
}
+
public void DebugPrint()
{
Debug.WriteLine("******** Index Checkpoint Info for {0} ********", token);
@@ -573,23 +450,21 @@ internal struct IndexCheckpointInfo
{
public IndexRecoveryInfo info;
public IDevice main_ht_device;
- public IDevice ofb_device;
- public void Initialize(Guid token, long _size, DirectoryConfiguration directoryConfiguration)
+ public void Initialize(Guid token, long _size, ICheckpointManager checkpointManager)
{
info.Initialize(token, _size);
- main_ht_device = Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(token), false);
- ofb_device = Devices.CreateLogDevice(directoryConfiguration.GetOverflowBucketsFileName(token), false);
+ checkpointManager.InitializeIndexCheckpoint(token);
+ main_ht_device = checkpointManager.GetIndexDevice(token);
}
- public void Recover(Guid token, DirectoryConfiguration directoryConfiguration)
+ public void Recover(Guid token, ICheckpointManager checkpointManager)
{
- info.Recover(token, directoryConfiguration);
+ info.Recover(token, checkpointManager);
}
public void Reset()
{
info.Reset();
main_ht_device.Close();
- ofb_device.Close();
}
}
}
diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs
index 2bd1ad96c..3257bbe45 100644
--- a/cs/src/core/Index/FASTER/FASTER.cs
+++ b/cs/src/core/Index/FASTER/FASTER.cs
@@ -4,6 +4,7 @@
#pragma warning disable 0162
using System;
+using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
namespace FASTER.core
@@ -65,7 +66,7 @@ private enum CheckpointType
private HybridLogCheckpointInfo _hybridLogCheckpoint;
- private SafeConcurrentDictionary _recoveredSessions;
+ private ConcurrentDictionary _recoveredSessions;
private FastThreadLocal prevThreadCtx;
private FastThreadLocal threadCtx;
@@ -104,7 +105,10 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo
if (checkpointSettings == null)
checkpointSettings = new CheckpointSettings();
- directoryConfiguration = new DirectoryConfiguration(checkpointSettings.CheckpointDir);
+ if (checkpointSettings.CheckpointDir != null && checkpointSettings.CheckpointManager != null)
+ throw new Exception("Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both");
+
+ checkpointManager = checkpointSettings.CheckpointManager ?? new LocalCheckpointManager(checkpointSettings.CheckpointDir ?? "");
FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
CopyReadsToTail = logSettings.CopyReadsToTail;
diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs
similarity index 91%
rename from cs/src/core/Index/FASTER/Checkpoint.cs
rename to cs/src/core/Index/Recovery/Checkpoint.cs
index bf26ea730..f17a8fc10 100644
--- a/cs/src/core/Index/FASTER/Checkpoint.cs
+++ b/cs/src/core/Index/Recovery/Checkpoint.cs
@@ -247,10 +247,10 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta
{
_indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress();
ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress);
- WriteIndexMetaFile();
- WriteIndexCheckpointCompleteFile();
}
+ _hybridLogCheckpoint.info.headAddress = hlog.HeadAddress;
+
if (FoldOverSnapshot)
{
hlog.ShiftReadOnlyToTail(out long tailAddress);
@@ -261,10 +261,8 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta
{
ObtainCurrentTailAddress(ref _hybridLogCheckpoint.info.finalLogicalAddress);
- _hybridLogCheckpoint.snapshotFileDevice = Devices.CreateLogDevice
- (directoryConfiguration.GetHybridLogCheckpointFileName(_hybridLogCheckpointToken), false);
- _hybridLogCheckpoint.snapshotFileObjectLogDevice = Devices.CreateLogDevice
- (directoryConfiguration.GetHybridLogObjectCheckpointFileName(_hybridLogCheckpointToken), false);
+ _hybridLogCheckpoint.snapshotFileDevice = checkpointManager.GetSnapshotLogDevice(_hybridLogCheckpointToken);
+ _hybridLogCheckpoint.snapshotFileObjectLogDevice = checkpointManager.GetSnapshotObjectLogDevice(_hybridLogCheckpointToken);
_hybridLogCheckpoint.snapshotFileDevice.Initialize(hlog.GetSegmentSize());
_hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(hlog.GetSegmentSize());
@@ -292,7 +290,10 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta
case Phase.PERSISTENCE_CALLBACK:
{
WriteHybridLogMetaInfo();
- WriteHybridLogCheckpointCompleteFile();
+
+ if (_checkpointType == CheckpointType.FULL)
+ WriteIndexMetaInfo();
+
MakeTransition(intermediateState, nextState);
break;
}
@@ -344,8 +345,7 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta
{
_indexCheckpoint.info.num_buckets = overflowBucketsAllocator.GetMaxValidAddress();
ObtainCurrentTailAddress(ref _indexCheckpoint.info.finalLogicalAddress);
- WriteIndexMetaFile();
- WriteIndexCheckpointCompleteFile();
+ WriteIndexMetaInfo();
_indexCheckpoint.Reset();
break;
}
@@ -524,7 +524,7 @@ private void HandleCheckpointingPhases()
if (notify)
{
- WriteHybridLogContextInfo();
+ _hybridLogCheckpoint.info.checkpointTokens.TryAdd(prevThreadCtx.Value.guid, prevThreadCtx.Value.serialNum);
if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version))
{
@@ -659,12 +659,12 @@ private SystemState GetNextState(SystemState start, CheckpointType type = Checkp
case Phase.INDEX_CHECKPOINT:
switch(type)
{
- case CheckpointType.INDEX_ONLY:
- nextState.phase = Phase.REST;
- break;
case CheckpointType.FULL:
nextState.phase = Phase.PREPARE;
break;
+ default:
+ nextState.phase = Phase.REST;
+ break;
}
break;
case Phase.PREPARE:
@@ -699,56 +699,14 @@ private SystemState GetNextState(SystemState start, CheckpointType type = Checkp
private void WriteHybridLogMetaInfo()
{
- string filename = directoryConfiguration.GetHybridLogCheckpointMetaFileName(_hybridLogCheckpointToken);
- using (var file = new StreamWriter(filename, false))
- {
- _hybridLogCheckpoint.info.Write(file);
- file.Flush();
- }
- }
-
- private void WriteHybridLogCheckpointCompleteFile()
- {
- string completed_filename = directoryConfiguration.GetHybridLogCheckpointFolder(_hybridLogCheckpointToken);
- completed_filename += Path.DirectorySeparatorChar + "completed.dat";
- using (var file = new StreamWriter(completed_filename, false))
- {
- file.WriteLine();
- file.Flush();
- }
- }
-
- private void WriteHybridLogContextInfo()
- {
- string filename = directoryConfiguration.GetHybridLogCheckpointContextFileName(_hybridLogCheckpointToken, prevThreadCtx.Value.guid);
- using (var file = new StreamWriter(filename, false))
- {
- prevThreadCtx.Value.Write(file);
- file.Flush();
- }
-
+ checkpointManager.CommitLogCheckpoint(_hybridLogCheckpointToken, _hybridLogCheckpoint.info.ToByteArray());
}
- private void WriteIndexMetaFile()
+ private void WriteIndexMetaInfo()
{
- string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(_indexCheckpointToken);
- using (var file = new StreamWriter(filename, false))
- {
- _indexCheckpoint.info.Write(file);
- file.Flush();
- }
+ checkpointManager.CommitIndexCheckpoint(_indexCheckpointToken, _indexCheckpoint.info.ToByteArray());
}
- private void WriteIndexCheckpointCompleteFile()
- {
- string completed_filename = directoryConfiguration.GetIndexCheckpointFolder(_indexCheckpointToken);
- completed_filename += Path.DirectorySeparatorChar + "completed.dat";
- using (var file = new StreamWriter(completed_filename, false))
- {
- file.WriteLine();
- file.Flush();
- }
- }
private bool ObtainCurrentTailAddress(ref long location)
{
var tailAddress = hlog.GetTailAddress();
@@ -757,14 +715,12 @@ private bool ObtainCurrentTailAddress(ref long location)
private void InitializeIndexCheckpoint(Guid indexToken)
{
- directoryConfiguration.CreateIndexCheckpointFolder(indexToken);
- _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, directoryConfiguration);
+ _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, checkpointManager);
}
private void InitializeHybridLogCheckpoint(Guid hybridLogToken, int version)
{
- directoryConfiguration.CreateHybridLogCheckpointFolder(hybridLogToken);
- _hybridLogCheckpoint.Initialize(hybridLogToken, version);
+ _hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager);
}
#endregion
diff --git a/cs/src/core/Index/Recovery/DirectoryConfiguration.cs b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs
new file mode 100644
index 000000000..5868b2172
--- /dev/null
+++ b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs
@@ -0,0 +1,134 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
+using System.IO;
+
+namespace FASTER.core
+{
+ class DirectoryConfiguration
+ {
+ private readonly string checkpointDir;
+
+ public DirectoryConfiguration(string checkpointDir)
+ {
+ this.checkpointDir = checkpointDir;
+ }
+
+ public const string index_base_folder = "index-checkpoints";
+ public const string index_meta_file = "info";
+ public const string hash_table_file = "ht";
+ public const string overflow_buckets_file = "ofb";
+ public const string snapshot_file = "snapshot";
+
+ public const string cpr_base_folder = "cpr-checkpoints";
+ public const string cpr_meta_file = "info";
+
+ public void CreateIndexCheckpointFolder(Guid token)
+ {
+ var directory = GetIndexCheckpointFolder(token);
+ Directory.CreateDirectory(directory);
+ DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory);
+ foreach (System.IO.FileInfo file in directoryInfo.GetFiles())
+ file.Delete();
+ }
+ public void CreateHybridLogCheckpointFolder(Guid token)
+ {
+ var directory = GetHybridLogCheckpointFolder(token);
+ Directory.CreateDirectory(directory);
+ DirectoryInfo directoryInfo = new System.IO.DirectoryInfo(directory);
+ foreach (System.IO.FileInfo file in directoryInfo.GetFiles())
+ file.Delete();
+ }
+
+ public string GetIndexCheckpointFolder(Guid token = default(Guid))
+ {
+ if (token != default(Guid))
+ return GetMergedFolderPath(checkpointDir, index_base_folder, token.ToString());
+ else
+ return GetMergedFolderPath(checkpointDir, index_base_folder);
+ }
+
+ public string GetHybridLogCheckpointFolder(Guid token = default(Guid))
+ {
+ if (token != default(Guid))
+ return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString());
+ else
+ return GetMergedFolderPath(checkpointDir, cpr_base_folder);
+ }
+
+ public string GetIndexCheckpointMetaFileName(Guid token)
+ {
+ return GetMergedFolderPath(checkpointDir,
+ index_base_folder,
+ token.ToString(),
+ index_meta_file,
+ ".dat");
+ }
+
+ public string GetPrimaryHashTableFileName(Guid token)
+ {
+ return GetMergedFolderPath(checkpointDir,
+ index_base_folder,
+ token.ToString(),
+ hash_table_file,
+ ".dat");
+ }
+
+ public string GetOverflowBucketsFileName(Guid token)
+ {
+ return GetMergedFolderPath(checkpointDir,
+ index_base_folder,
+ token.ToString(),
+ overflow_buckets_file,
+ ".dat");
+ }
+
+ public string GetHybridLogCheckpointMetaFileName(Guid token)
+ {
+ return GetMergedFolderPath(checkpointDir,
+ cpr_base_folder,
+ token.ToString(),
+ cpr_meta_file,
+ ".dat");
+ }
+
+ public string GetHybridLogCheckpointContextFileName(Guid checkpointToken, Guid sessionToken)
+ {
+ return GetMergedFolderPath(checkpointDir,
+ cpr_base_folder,
+ checkpointToken.ToString(),
+ sessionToken.ToString(),
+ ".dat");
+ }
+
+ public string GetLogSnapshotFileName(Guid token)
+ {
+ return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), snapshot_file, ".dat");
+ }
+
+ public string GetObjectLogSnapshotFileName(Guid token)
+ {
+ return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), snapshot_file, ".obj.dat");
+ }
+
+ private static string GetMergedFolderPath(params String[] paths)
+ {
+ String fullPath = paths[0];
+
+ for (int i = 1; i < paths.Length; i++)
+ {
+ if (i == paths.Length - 1 && paths[i].Contains("."))
+ {
+ fullPath += paths[i];
+ }
+ else
+ {
+ fullPath += Path.DirectorySeparatorChar + paths[i];
+ }
+ }
+
+ return fullPath;
+ }
+ }
+}
\ No newline at end of file
diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs
new file mode 100644
index 000000000..947d06df4
--- /dev/null
+++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs
@@ -0,0 +1,111 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace FASTER.core
+{
+ ///
+ /// Interface for users to control creation and retrieval of checkpoint-related data
+ /// FASTER calls this interface during checkpoint/recovery in this sequence:
+ ///
+ /// Checkpoint:
+ /// InitializeIndexCheckpoint (for index checkpoints) ->
+ /// GetIndexDevice (for index checkpoints) ->
+ /// InitializeLogCheckpoint (for log checkpoints) ->
+ /// GetSnapshotLogDevice (for log checkpoints in snapshot mode) ->
+ /// GetSnapshotObjectLogDevice (for log checkpoints in snapshot mode with objects) ->
+ /// CommitLogCheckpoint (for log checkpoints) ->
+ /// CommitIndexCheckpoint (for index checkpoints) ->
+ ///
+ /// Recovery:
+ /// GetLatestCheckpoint (if request to recover to latest checkpoint) ->
+ /// GetIndexCommitMetadata ->
+ /// GetLogCommitMetadata ->
+ /// GetIndexDevice ->
+ /// GetSnapshotLogDevice (for recovery in snapshot mode) ->
+ /// GetSnapshotObjectLogDevice (for recovery in snapshot mode with objects)
+ ///
+ /// Provided devices will be closed directly by FASTER when done.
+ ///
+ public interface ICheckpointManager
+ {
+ ///
+ /// Initialize index checkpoint
+ ///
+ ///
+ void InitializeIndexCheckpoint(Guid indexToken);
+
+ ///
+ /// Initialize log checkpoint (snapshot and fold-over)
+ ///
+ ///
+ void InitializeLogCheckpoint(Guid logToken);
+
+ ///
+ /// Commit index checkpoint
+ ///
+ ///
+ ///
+ ///
+ void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata);
+
+ ///
+ /// Commit log checkpoint (snapshot and fold-over)
+ ///
+ ///
+ ///
+ ///
+ void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata);
+
+ ///
+ /// Retrieve commit metadata for specified index checkpoint
+ ///
+ /// Token
+ /// Metadata, or null if invalid
+ byte[] GetIndexCommitMetadata(Guid indexToken);
+
+ ///
+ /// Retrieve commit metadata for specified log checkpoint
+ ///
+ /// Token
+ /// Metadata, or null if invalid
+ byte[] GetLogCommitMetadata(Guid logToken);
+
+ ///
+ /// Provide device to store index checkpoint (including overflow buckets)
+ ///
+ ///
+ ///
+ IDevice GetIndexDevice(Guid indexToken);
+
+ ///
+ /// Provide device to store snapshot of log (required only for snapshot checkpoints)
+ ///
+ ///
+ ///
+ IDevice GetSnapshotLogDevice(Guid token);
+
+ ///
+ /// Provide device to store snapshot of object log (required only for snapshot checkpoints)
+ ///
+ ///
+ ///
+ IDevice GetSnapshotObjectLogDevice(Guid token);
+
+ ///
+ /// Get latest valid checkpoint for recovery
+ ///
+ ///
+ ///
+ /// true if latest valid checkpoint found, false otherwise
+ bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken);
+ }
+}
\ No newline at end of file
diff --git a/cs/src/core/Index/FASTER/IndexCheckpoint.cs b/cs/src/core/Index/Recovery/IndexCheckpoint.cs
similarity index 89%
rename from cs/src/core/Index/FASTER/IndexCheckpoint.cs
rename to cs/src/core/Index/Recovery/IndexCheckpoint.cs
index 0ec78b3ad..b0d228f5f 100644
--- a/cs/src/core/Index/FASTER/IndexCheckpoint.cs
+++ b/cs/src/core/Index/Recovery/IndexCheckpoint.cs
@@ -28,9 +28,10 @@ internal void TakeIndexFuzzyCheckpoint()
TakeMainIndexCheckpoint(ht_version,
_indexCheckpoint.main_ht_device,
out ulong ht_num_bytes_written);
- overflowBucketsAllocator.TakeCheckpoint(
- _indexCheckpoint.ofb_device,
- out ulong ofb_num_bytes_written);
+
+ var sectorSize = _indexCheckpoint.main_ht_device.SectorSize;
+ var alignedIndexSize = (uint)((ht_num_bytes_written + (sectorSize - 1)) & ~(sectorSize - 1));
+ overflowBucketsAllocator.TakeCheckpoint(_indexCheckpoint.main_ht_device, alignedIndexSize, out ulong ofb_num_bytes_written);
_indexCheckpoint.info.num_ht_bytes = ht_num_bytes_written;
_indexCheckpoint.info.num_ofb_bytes = ofb_num_bytes_written;
}
@@ -40,7 +41,9 @@ internal void TakeIndexFuzzyCheckpoint(int ht_version, IDevice device,
out ulong ofbnumBytesWritten, out int num_ofb_buckets)
{
TakeMainIndexCheckpoint(ht_version, device, out numBytesWritten);
- overflowBucketsAllocator.TakeCheckpoint(ofbdevice, out ofbnumBytesWritten);
+ var sectorSize = device.SectorSize;
+ var alignedIndexSize = (uint)((numBytesWritten + (sectorSize - 1)) & ~(sectorSize - 1));
+ overflowBucketsAllocator.TakeCheckpoint(ofbdevice, alignedIndexSize, out ofbnumBytesWritten);
num_ofb_buckets = overflowBucketsAllocator.GetMaxValidAddress();
}
diff --git a/cs/src/core/Index/FASTER/IndexRecovery.cs b/cs/src/core/Index/Recovery/IndexRecovery.cs
similarity index 87%
rename from cs/src/core/Index/FASTER/IndexRecovery.cs
rename to cs/src/core/Index/Recovery/IndexRecovery.cs
index fca7d230d..d1f0bb918 100644
--- a/cs/src/core/Index/FASTER/IndexRecovery.cs
+++ b/cs/src/core/Index/Recovery/IndexRecovery.cs
@@ -19,7 +19,7 @@ namespace FASTER.core
///
public unsafe partial class FasterBase
{
- internal DirectoryConfiguration directoryConfiguration;
+ internal ICheckpointManager checkpointManager;
// Derived class exposed API
internal void RecoverFuzzyIndex(IndexCheckpointInfo info)
@@ -29,24 +29,22 @@ internal void RecoverFuzzyIndex(IndexCheckpointInfo info)
Debug.Assert(state[ht_version].size == info.info.table_size);
// Create devices to read from using Async API
- info.main_ht_device = Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(token), false);
- info.ofb_device = Devices.CreateLogDevice(directoryConfiguration.GetOverflowBucketsFileName(token), false);
+ info.main_ht_device = checkpointManager.GetIndexDevice(token);
BeginMainIndexRecovery(ht_version,
info.main_ht_device,
info.info.num_ht_bytes);
- overflowBucketsAllocator.Recover(
- info.ofb_device,
- info.info.num_buckets,
- info.info.num_ofb_bytes);
+ var sectorSize = info.main_ht_device.SectorSize;
+ var alignedIndexSize = (uint)((info.info.num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1));
+
+ overflowBucketsAllocator.Recover(info.main_ht_device, alignedIndexSize, info.info.num_buckets, info.info.num_ofb_bytes);
// Wait until reading is complete
IsFuzzyIndexRecoveryComplete(true);
// close index checkpoint files appropriately
info.main_ht_device.Close();
- info.ofb_device.Close();
// Delete all tentative entries!
DeleteTentativeEntries();
@@ -55,7 +53,9 @@ internal void RecoverFuzzyIndex(IndexCheckpointInfo info)
internal void RecoverFuzzyIndex(int ht_version, IDevice device, ulong num_ht_bytes, IDevice ofbdevice, int num_buckets, ulong num_ofb_bytes)
{
BeginMainIndexRecovery(ht_version, device, num_ht_bytes);
- overflowBucketsAllocator.Recover(ofbdevice, num_buckets, num_ofb_bytes);
+ var sectorSize = device.SectorSize;
+ var alignedIndexSize = (uint)((num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1));
+ overflowBucketsAllocator.Recover(ofbdevice, alignedIndexSize, num_buckets, num_ofb_bytes);
}
internal bool IsFuzzyIndexRecoveryComplete(bool waitUntilComplete = false)
diff --git a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs
new file mode 100644
index 000000000..9eb270d95
--- /dev/null
+++ b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs
@@ -0,0 +1,206 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace FASTER.core
+{
+ ///
+ /// Implementation of checkpoint interface for local file storage
+ ///
+ public class LocalCheckpointManager : ICheckpointManager
+ {
+ private DirectoryConfiguration directoryConfiguration;
+
+ ///
+ /// Create new instance of local checkpoint manager at given base directory
+ ///
+ ///
+ public LocalCheckpointManager(string CheckpointDir)
+ {
+ directoryConfiguration = new DirectoryConfiguration(CheckpointDir);
+ }
+
+ ///
+ /// Initialize index checkpoint
+ ///
+ ///
+ public void InitializeIndexCheckpoint(Guid indexToken)
+ {
+ directoryConfiguration.CreateIndexCheckpointFolder(indexToken);
+ }
+
+ ///
+ /// Initialize log checkpoint (snapshot and fold-over)
+ ///
+ ///
+ public void InitializeLogCheckpoint(Guid logToken)
+ {
+ directoryConfiguration.CreateHybridLogCheckpointFolder(logToken);
+ }
+
+ ///
+ /// Commit index checkpoint
+ ///
+ ///
+ ///
+ public void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
+ {
+ string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(indexToken);
+ using (var writer = new BinaryWriter(new FileStream(filename, FileMode.Create)))
+ {
+ writer.Write(commitMetadata.Length);
+ writer.Write(commitMetadata);
+ writer.Flush();
+ }
+
+ string completed_filename = directoryConfiguration.GetIndexCheckpointFolder(indexToken);
+ completed_filename += Path.DirectorySeparatorChar + "completed.dat";
+ using (var file = new FileStream(completed_filename, FileMode.Create))
+ {
+ file.Flush();
+ }
+ }
+
+ ///
+ /// Commit log checkpoint (snapshot and fold-over)
+ ///
+ ///
+ ///
+ public void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
+ {
+ string filename = directoryConfiguration.GetHybridLogCheckpointMetaFileName(logToken);
+ using (var writer = new BinaryWriter(new FileStream(filename, FileMode.Create)))
+ {
+ writer.Write(commitMetadata.Length);
+ writer.Write(commitMetadata);
+ writer.Flush();
+ }
+
+ string completed_filename = directoryConfiguration.GetHybridLogCheckpointFolder(logToken);
+ completed_filename += Path.DirectorySeparatorChar + "completed.dat";
+ using (var file = new FileStream(completed_filename, FileMode.Create))
+ {
+ file.Flush();
+ }
+ }
+
+ ///
+ /// Retrieve commit metadata for specified index checkpoint
+ ///
+ /// Token
+ /// Metadata, or null if invalid
+ public byte[] GetIndexCommitMetadata(Guid indexToken)
+ {
+ var dir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder(indexToken));
+ if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
+ return null;
+
+ string filename = directoryConfiguration.GetIndexCheckpointMetaFileName(indexToken);
+ using (var reader = new BinaryReader(new FileStream(filename, FileMode.Open)))
+ {
+ var len = reader.ReadInt32();
+ return reader.ReadBytes(len);
+ }
+ }
+
+ ///
+ /// Retrieve commit metadata for specified log checkpoint
+ ///
+ /// Token
+ /// Metadata, or null if invalid
+ public byte[] GetLogCommitMetadata(Guid logToken)
+ {
+ var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(logToken));
+ if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
+ return null;
+
+ string checkpointInfoFile = directoryConfiguration.GetHybridLogCheckpointMetaFileName(logToken);
+ using (var reader = new BinaryReader(new FileStream(checkpointInfoFile, FileMode.Open)))
+ {
+ var len = reader.ReadInt32();
+ return reader.ReadBytes(len);
+ }
+ }
+
+ ///
+ /// Provide device to store index checkpoint (including overflow buckets)
+ ///
+ ///
+ ///
+ public IDevice GetIndexDevice(Guid indexToken)
+ {
+ return Devices.CreateLogDevice(directoryConfiguration.GetPrimaryHashTableFileName(indexToken), false);
+ }
+
+ ///
+ /// Provide device to store snapshot of log (required only for snapshot checkpoints)
+ ///
+ ///
+ ///
+ public IDevice GetSnapshotLogDevice(Guid token)
+ {
+ return Devices.CreateLogDevice(directoryConfiguration.GetLogSnapshotFileName(token), false);
+ }
+
+ ///
+ /// Provide device to store snapshot of object log (required only for snapshot checkpoints)
+ ///
+ ///
+ ///
+ public IDevice GetSnapshotObjectLogDevice(Guid token)
+ {
+ return Devices.CreateLogDevice(directoryConfiguration.GetObjectLogSnapshotFileName(token), false);
+ }
+
+ ///
+ /// Get latest valid checkpoint for recovery
+ ///
+ ///
+ ///
+ ///
+ public bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken)
+ {
+ var indexCheckpointDir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder());
+ var dirs = indexCheckpointDir.GetDirectories();
+ foreach (var dir in dirs)
+ {
+ // Remove incomplete checkpoints
+ if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
+ {
+ Directory.Delete(dir.FullName, true);
+ }
+ }
+ var latestICFolder = indexCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First();
+ if (latestICFolder == null || !Guid.TryParse(latestICFolder.Name, out indexToken))
+ {
+ throw new Exception("No valid index checkpoint to recover from");
+ }
+
+
+ var hlogCheckpointDir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder());
+ dirs = hlogCheckpointDir.GetDirectories();
+ foreach (var dir in dirs)
+ {
+ // Remove incomplete checkpoints
+ if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
+ {
+ Directory.Delete(dir.FullName, true);
+ }
+ }
+ var latestHLCFolder = hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First();
+ if (latestHLCFolder == null || !Guid.TryParse(latestHLCFolder.Name, out logToken))
+ {
+ throw new Exception("No valid hybrid log checkpoint to recover from");
+ }
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/cs/src/core/Index/FASTER/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs
similarity index 76%
rename from cs/src/core/Index/FASTER/Recovery.cs
rename to cs/src/core/Index/Recovery/Recovery.cs
index 5ac34edc3..81bf73da4 100644
--- a/cs/src/core/Index/FASTER/Recovery.cs
+++ b/cs/src/core/Index/Recovery/Recovery.cs
@@ -62,66 +62,10 @@ public unsafe partial class FasterKV f.LastWriteTime).First();
- if(latestICFolder == null || !Guid.TryParse(latestICFolder.Name, out Guid indexCheckpointGuid))
- {
- throw new Exception("No valid index checkpoint to recover from");
- }
-
-
- var hlogCheckpointDir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder());
- dirs = hlogCheckpointDir.GetDirectories();
- foreach (var dir in dirs)
- {
- // Remove incomplete checkpoints
- if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
- {
- Directory.Delete(dir.FullName, true);
- }
- }
- var latestHLCFolder = hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First();
- if (latestHLCFolder == null || !Guid.TryParse(latestHLCFolder.Name, out Guid hybridLogCheckpointGuid))
- {
- throw new Exception("No valid hybrid log checkpoint to recover from");
- }
-
+ checkpointManager.GetLatestCheckpoint(out Guid indexCheckpointGuid, out Guid hybridLogCheckpointGuid);
InternalRecover(indexCheckpointGuid, hybridLogCheckpointGuid);
}
- private bool IsCheckpointSafe(Guid token, CheckpointType checkpointType)
- {
- switch (checkpointType)
- {
- case CheckpointType.INDEX_ONLY:
- {
- var dir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder(token));
- return File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat");
- }
- case CheckpointType.HYBRID_LOG_ONLY:
- {
- var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(token));
- return File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat");
- }
- case CheckpointType.FULL:
- {
- return IsCheckpointSafe(token, CheckpointType.INDEX_ONLY)
- && IsCheckpointSafe(token, CheckpointType.HYBRID_LOG_ONLY);
- }
- default:
- return false;
- }
- }
-
private bool IsCompatible(IndexRecoveryInfo indexInfo, HybridLogRecoveryInfo recoveryInfo)
{
var l1 = indexInfo.finalLogicalAddress;
@@ -135,20 +79,13 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken)
Debug.WriteLine("Index Checkpoint: {0}", indexToken);
Debug.WriteLine("HybridLog Checkpoint: {0}", hybridLogToken);
- // Assert corresponding checkpoints are safe to recover from
- Debug.Assert(IsCheckpointSafe(indexToken, CheckpointType.INDEX_ONLY),
- "Cannot recover from incomplete index checkpoint " + indexToken.ToString());
-
- Debug.Assert(IsCheckpointSafe(hybridLogToken, CheckpointType.HYBRID_LOG_ONLY),
- "Cannot recover from incomplete hybrid log checkpoint " + hybridLogToken.ToString());
-
// Recovery appropriate context information
var recoveredICInfo = new IndexCheckpointInfo();
- recoveredICInfo.Recover(indexToken, directoryConfiguration);
+ recoveredICInfo.Recover(indexToken, checkpointManager);
recoveredICInfo.info.DebugPrint();
var recoveredHLCInfo = new HybridLogCheckpointInfo();
- recoveredHLCInfo.Recover(hybridLogToken, directoryConfiguration);
+ recoveredHLCInfo.Recover(hybridLogToken, checkpointManager);
recoveredHLCInfo.info.DebugPrint();
// Check if the two checkpoints are compatible for recovery
@@ -184,64 +121,56 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken)
// Read appropriate hybrid log pages into memory
- RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress);
+ RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress);
// Recover session information
- _recoveredSessions = new SafeConcurrentDictionary();
- foreach(var sessionInfo in recoveredHLCInfo.info.continueTokens)
- {
-
- _recoveredSessions.GetOrAdd(sessionInfo.Key, sessionInfo.Value);
- }
+ _recoveredSessions = recoveredHLCInfo.info.continueTokens;
}
- private void RestoreHybridLog(long untilAddress)
+ private void RestoreHybridLog(long untilAddress, long headAddress)
{
-
- var tailPage = hlog.GetPage(untilAddress);
- var headPage = default(long);
- if (untilAddress > hlog.GetStartLogicalAddress(tailPage))
+ // Special case: we do not load any records into memory
+ if (headAddress == untilAddress)
{
- headPage = (tailPage + 1) - hlog.GetHeadOffsetLagInPages(); ;
+ hlog.AllocatePage(hlog.GetPageIndexForAddress(headAddress));
}
else
{
- headPage = tailPage - hlog.GetHeadOffsetLagInPages();
- }
- headPage = headPage > 0 ? headPage : 0;
+ var tailPage = hlog.GetPage(untilAddress);
+ var headPage = hlog.GetPage(headAddress);
- var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress);
- for (int i = 0; i < recoveryStatus.capacity; i++)
- {
- recoveryStatus.readStatus[i] = ReadStatus.Done;
- }
+ var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress);
+ for (int i = 0; i < recoveryStatus.capacity; i++)
+ {
+ recoveryStatus.readStatus[i] = ReadStatus.Done;
+ }
- var numPages = 0;
- for (var page = headPage; page <= tailPage; page++)
- {
- var pageIndex = hlog.GetPageIndexForPage(page);
- recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
- numPages++;
- }
+ var numPages = 0;
+ for (var page = headPage; page <= tailPage; page++)
+ {
+ var pageIndex = hlog.GetPageIndexForPage(page);
+ recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
+ numPages++;
+ }
- hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus);
+ hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus);
- var done = false;
- while (!done)
- {
- done = true;
- for (long page = headPage; page <= tailPage; page++)
+ var done = false;
+ while (!done)
{
- int pageIndex = hlog.GetPageIndexForPage(page);
- if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending)
+ done = true;
+ for (long page = headPage; page <= tailPage; page++)
{
- done = false;
- break;
+ int pageIndex = hlog.GetPageIndexForPage(page);
+ if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending)
+ {
+ done = false;
+ break;
+ }
}
}
}
- var headAddress = hlog.GetFirstValidLogicalAddress(headPage);
hlog.RecoveryReset(untilAddress, headAddress);
}
@@ -341,8 +270,8 @@ private void RecoverHybridLogFromSnapshotFile(
// By default first page has one extra record
var capacity = hlog.GetCapacityNumPages();
- var recoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogCheckpointFileName(recoveryInfo.guid), false);
- var objectLogRecoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogObjectCheckpointFileName(recoveryInfo.guid), false);
+ var recoveryDevice = checkpointManager.GetSnapshotLogDevice(recoveryInfo.guid);
+ var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(recoveryInfo.guid);
recoveryDevice.Initialize(hlog.GetSegmentSize());
objectLogRecoveryDevice.Initialize(hlog.GetSegmentSize());
var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress)
diff --git a/cs/src/core/Utilities/BufferPool.cs b/cs/src/core/Utilities/BufferPool.cs
index 872eb8d4e..cfcb4eb77 100644
--- a/cs/src/core/Utilities/BufferPool.cs
+++ b/cs/src/core/Utilities/BufferPool.cs
@@ -133,6 +133,11 @@ public void Return(SectorAlignedMemory page)
Array.Clear(page.buffer, 0, page.buffer.Length);
if (!Disabled)
queue[page.level].Enqueue(page);
+ else
+ {
+ page.handle.Free();
+ page.buffer = null;
+ }
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
diff --git a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
new file mode 100644
index 000000000..cf3491e57
--- /dev/null
+++ b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
@@ -0,0 +1,179 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.IO;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using FASTER.core;
+using Microsoft.Azure.Storage;
+using Microsoft.Azure.Storage.Blob;
+
+namespace FASTER.devices
+{
+ ///
+ /// A IDevice Implementation that is backed byAzure Page Blob.
+ /// This device is slower than a local SSD or HDD, but provides scalability and shared access in the cloud.
+ ///
+ public class AzureStorageDevice : StorageDeviceBase
+ {
+ private CloudBlobContainer container;
+ private readonly ConcurrentDictionary blobs;
+ private readonly string blobName;
+ private readonly bool deleteOnClose;
+
+ // Page Blobs permit blobs of max size 8 TB, but the emulator permits only 2 GB
+ private const long MAX_BLOB_SIZE = (long)(2 * 10e8);
+ // Azure Page Blobs have a fixed sector size of 512 bytes.
+ private const uint PAGE_BLOB_SECTOR_SIZE = 512;
+
+ ///
+ /// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs
+ ///
+ /// The connection string to use when estblishing connection to Azure Blobs
+ /// Name of the Azure Blob container to use. If there does not exist a container with the supplied name, one is created
+ /// A descriptive name that will be the prefix of all blobs created with this device
+ ///
+ /// True if the program should delete all blobs created on call to Close. False otherwise.
+ /// The container is not deleted even if it was created in this constructor
+ ///
+ public AzureStorageDevice(string connectionString, string containerName, string blobName, bool deleteOnClose = false)
+ : base(connectionString + "/" + containerName + "/" + blobName, PAGE_BLOB_SECTOR_SIZE)
+ {
+ CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
+ CloudBlobClient client = storageAccount.CreateCloudBlobClient();
+ container = client.GetContainerReference(containerName);
+ container.CreateIfNotExists();
+ blobs = new ConcurrentDictionary();
+ this.blobName = blobName;
+ this.deleteOnClose = deleteOnClose;
+ }
+
+ ///
+ /// Inherited
+ ///
+ public override void Close()
+ {
+ // Unlike in LocalStorageDevice, we explicitly remove all page blobs if the deleteOnClose flag is set, instead of relying on the operating system
+ // to delete files after the end of our process. This leads to potential problems if multiple instances are sharing the same underlying page blobs.
+ //
+ // Since this flag is presumably only used for testing though, it is probably fine.
+ if (deleteOnClose)
+ {
+ foreach (var entry in blobs)
+ {
+ entry.Value.GetPageBlob().Delete();
+ }
+ }
+ }
+
+ ///
+ /// Inherited
+ ///
+ public override void DeleteSegmentRange(int fromSegment, int toSegment)
+ {
+ for (int i = fromSegment; i < toSegment; i++)
+ {
+ if (blobs.TryRemove(i, out BlobEntry blob))
+ {
+ blob.GetPageBlob().Delete();
+ }
+ }
+ }
+
+ ///
+ /// Inherited
+ ///
+ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult)
+ {
+ // It is up to the allocator to make sure no reads are issued to segments before they are written
+ if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry)) throw new InvalidOperationException("Attempting to read non-existent segments");
+
+ // Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API
+ Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
+ NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
+
+ UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write);
+ CloudPageBlob pageBlob = blobEntry.GetPageBlob();
+ pageBlob.BeginDownloadRangeToStream(stream, (Int64)sourceAddress, readLength, ar => {
+ try
+ {
+ pageBlob.EndDownloadRangeToStream(ar);
+ }
+ // I don't think I can be more specific in catch here because no documentation on exception behavior is provided
+ catch (Exception e)
+ {
+ Trace.TraceError(e.Message);
+ // Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error
+ // but does not distinguish between them.
+ callback(2, readLength, ovNative);
+ }
+ callback(0, readLength, ovNative);
+ }, asyncResult);
+ }
+
+ ///
+ /// Inherited
+ ///
+ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
+ {
+ if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry))
+ {
+ BlobEntry entry = new BlobEntry();
+ if (blobs.TryAdd(segmentId, entry))
+ {
+ CloudPageBlob pageBlob = container.GetPageBlobReference(blobName + segmentId);
+ // If segment size is -1, which denotes absence, we request the largest possible blob. This is okay because
+ // page blobs are not backed by real pages on creation, and the given size is only a the physical limit of
+ // how large it can grow to.
+ var size = segmentSize == -1 ? MAX_BLOB_SIZE : segmentSize;
+ // If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
+ // After creation is done, we can call write.
+ entry.CreateAsync(size, pageBlob);
+ }
+ // Otherwise, some other thread beat us to it. Okay to use their blobs.
+ blobEntry = blobs[segmentId];
+ }
+ TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void TryWriteAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
+ {
+ CloudPageBlob pageBlob = blobEntry.GetPageBlob();
+ // If pageBlob is null, it is being created. Attempt to queue the write for the creator to complete after it is done
+ if (pageBlob == null
+ && blobEntry.TryQueueAction(p => WriteToBlobAsync(p, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult))) return;
+
+ // Otherwise, invoke directly.
+ WriteToBlobAsync(pageBlob, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static unsafe void WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
+ {
+ // Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API
+ Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
+ NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
+ UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)sourceAddress, numBytesToWrite);
+ blob.BeginWritePages(stream, (long)destinationAddress, null, ar =>
+ {
+ try
+ {
+ blob.EndWritePages(ar);
+ }
+ // I don't think I can be more specific in catch here because no documentation on exception behavior is provided
+ catch (Exception e)
+ {
+ Trace.TraceError(e.Message);
+ // Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error
+ // but does not distinguish between them.
+ callback(1, numBytesToWrite, ovNative);
+ }
+ callback(0, numBytesToWrite, ovNative);
+ }, asyncResult);
+ }
+ }
+}
diff --git a/cs/src/devices/AzureStorageDevice/BlobEntry.cs b/cs/src/devices/AzureStorageDevice/BlobEntry.cs
new file mode 100644
index 000000000..e70d20b6d
--- /dev/null
+++ b/cs/src/devices/AzureStorageDevice/BlobEntry.cs
@@ -0,0 +1,106 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Threading;
+using Microsoft.Azure.Storage.Blob;
+
+namespace FASTER.devices
+{
+ // This class bundles a page blob object with a queue and a counter to ensure
+ // 1) BeginCreate is not called more than once
+ // 2) No writes are issued before EndCreate
+ // The creator of a BlobEntry is responsible for populating the object with an underlying Page Blob. Any subsequent callers
+ // either directly write to the created page blob, or queues the write so the creator can clear it after creation is complete.
+ // In-progress creation is denoted by a null value on the underlying page blob
+ class BlobEntry
+ {
+ private CloudPageBlob pageBlob;
+ private ConcurrentQueue> pendingWrites;
+ private int waitingCount;
+
+ ///
+ /// Creates a new BlobEntry, does not initialize a page blob. Use
+ /// for actual creation.
+ ///
+ public BlobEntry()
+ {
+ pageBlob = null;
+ pendingWrites = new ConcurrentQueue>();
+ waitingCount = 0;
+ }
+
+ ///
+ /// Getter for the underlying
+ ///
+ /// the underlying , or null if there is none
+ public CloudPageBlob GetPageBlob()
+ {
+ return pageBlob;
+ }
+
+ ///
+ /// Asynchronously invoke create on the given pageBlob.
+ ///
+ /// maximum size of the blob
+ /// The page blob to create
+ public void CreateAsync(long size, CloudPageBlob pageBlob)
+ {
+ Debug.Assert(waitingCount == 0, "Create should be called on blobs that don't already exist and exactly once");
+ // Asynchronously create the blob
+ pageBlob.BeginCreate(size, ar =>
+ {
+ try
+ {
+ pageBlob.EndCreate(ar);
+ }
+ catch (Exception e)
+ {
+ // TODO(Tianyu): Can't really do better without knowing error behavior
+ Trace.TraceError(e.Message);
+ }
+ // At this point the blob is fully created. After this line all consequent writers will write immediately. We just
+ // need to clear the queue of pending writers.
+ this.pageBlob = pageBlob;
+ // Take a snapshot of the current waiting count. Exactly this many actions will be cleared.
+ // Swapping in -1 will inform any stragglers that we are not taking their actions and prompt them to retry (and call write directly)
+ int waitingCountSnapshot = Interlocked.Exchange(ref waitingCount, -1);
+ Action action;
+ // Clear actions
+ for (int i = 0; i < waitingCountSnapshot; i++)
+ {
+ // inserts into the queue may lag behind the creation thread. We have to wait until that happens.
+ // This is so rare, that we are probably okay with a busy wait.
+ while (!pendingWrites.TryDequeue(out action)) { }
+ action(pageBlob);
+ }
+ // Mark for deallocation for the GC
+ pendingWrites = null;
+ }, null);
+ }
+
+ ///
+ /// Attempts to enqueue an action to be invoked by the creator after creation is done. Should only be invoked when
+ /// creation is in-flight. This call is allowed to fail (and return false) if concurrently the creation is complete.
+ /// The caller should call the write action directly instead of queueing in this case.
+ ///
+ /// The write action to perform
+ /// Whether the action was successfully enqueued
+ public bool TryQueueAction(Action writeAction)
+ {
+ int currentCount;
+ do
+ {
+ currentCount = waitingCount;
+ // If current count became -1, creation is complete. New queue entries will not be processed and we must call the action ourselves.
+ if (currentCount == -1) return false;
+ } while (Interlocked.CompareExchange(ref waitingCount, currentCount + 1, currentCount) != currentCount);
+ // Enqueue last. The creation thread is obliged to wait until it has processed waitingCount many actions.
+ // It is extremely unlikely that we will get scheduled out here anyways.
+ pendingWrites.Enqueue(writeAction);
+ return true;
+ }
+ }
+}
diff --git a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj
new file mode 100644
index 000000000..de06d0e90
--- /dev/null
+++ b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj
@@ -0,0 +1,44 @@
+
+
+
+ netstandard2.0;net46
+ AnyCPU;x64
+
+
+
+ true
+ FASTER.devices
+ FASTER.devices.AzureStorageDevice
+ prompt
+ true
+
+ Library
+
+ ../../../FASTER.snk
+ false
+ bin\$(Platform)\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml
+
+
+
+ TRACE;DEBUG
+ full
+ bin\$(Platform)\Debug\
+
+
+ TRACE
+ pdbonly
+ true
+ bin\$(Platform)\Release\
+
+
+ $(DefineConstants);DOTNETCORE
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec
new file mode 100644
index 000000000..3f66e36c9
--- /dev/null
+++ b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec
@@ -0,0 +1,36 @@
+
+
+
+ FASTER.devices.AzureStorageDevice
+ $version$
+ FASTER.devices.AzureStorageDevice
+ Microsoft
+ Microsoft
+ https://github.com/Microsoft/FASTER
+ MIT
+ true
+ This is a FASTER device implementation for Azure Storage (page blobs). FASTER is a fast concurrent key-value store that also supports indexing of larger-than-memory data.
+ See the project website at https://github.com/Microsoft/FASTER for more details
+ © Microsoft Corporation. All rights reserved.
+ en-US
+ key-value store dictionary hashtable concurrent log persistent azure storage FASTER
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/cs/test/BasicDiskFASTERTests.cs b/cs/test/BasicDiskFASTERTests.cs
index a84abb206..0d8ad497f 100644
--- a/cs/test/BasicDiskFASTERTests.cs
+++ b/cs/test/BasicDiskFASTERTests.cs
@@ -10,37 +10,37 @@
using FASTER.core;
using System.IO;
using NUnit.Framework;
+using FASTER.devices;
+using System.Diagnostics;
namespace FASTER.test
{
-
[TestFixture]
- internal class BasicDiskFASTERTests
+ internal class BasicStorageFASTERTests
{
private FasterKV fht;
- private IDevice log;
+ public const string EMULATED_STORAGE_STRING = "UseDevelopmentStorage=true;";
+ public const string TEST_CONTAINER = "test";
- [SetUp]
- public void Setup()
+ [Test]
+ public void LocalStorageWriteRead()
{
- log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\BasicDiskFASTERTests.log", deleteOnClose: true);
- fht = new FasterKV
- (1L<<20, new Functions(), new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 10 });
- fht.StartSession();
+ TestDeviceWriteRead(Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\BasicDiskFASTERTests.log", deleteOnClose: true));
}
- [TearDown]
- public void TearDown()
+ [Test]
+ public void PageBlobWriteRead()
{
- fht.StopSession();
- fht.Dispose();
- fht = null;
- log.Close();
+ if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests")))
+ TestDeviceWriteRead(new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "BasicDiskFASTERTests", false));
}
- [Test]
- public void NativeDiskWriteRead()
+ void TestDeviceWriteRead(IDevice log)
{
+ fht = new FasterKV
+ (1L << 20, new Functions(), new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 10 });
+ fht.StartSession();
+
InputStruct input = default(InputStruct);
for (int i = 0; i < 2000; i++)
@@ -86,6 +86,11 @@ public void NativeDiskWriteRead()
}
}
}
+
+ fht.StopSession();
+ fht.Dispose();
+ fht = null;
+ log.Close();
}
}
}
diff --git a/cs/test/FASTER.test.csproj b/cs/test/FASTER.test.csproj
index a6c0cc437..41d0226cc 100644
--- a/cs/test/FASTER.test.csproj
+++ b/cs/test/FASTER.test.csproj
@@ -44,5 +44,6 @@
+
diff --git a/cs/test/FullRecoveryTests.cs b/cs/test/FullRecoveryTests.cs
index 12d91aded..b07f129be 100644
--- a/cs/test/FullRecoveryTests.cs
+++ b/cs/test/FullRecoveryTests.cs
@@ -170,7 +170,7 @@ public void RecoverAndTest(Guid cprVersion, Guid indexVersion)
// Test outputs
var checkpointInfo = default(HybridLogRecoveryInfo);
- checkpointInfo.Recover(cprVersion, new DirectoryConfiguration(test_path));
+ checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path));
// Compute expected array
long[] expected = new long[numUniqueKeys];
diff --git a/cs/test/ObjectRecoveryTest.cs b/cs/test/ObjectRecoveryTest.cs
index 670afbce7..10287f21d 100644
--- a/cs/test/ObjectRecoveryTest.cs
+++ b/cs/test/ObjectRecoveryTest.cs
@@ -189,7 +189,7 @@ public unsafe void RecoverAndTest(Guid cprVersion, Guid indexVersion)
// Test outputs
var checkpointInfo = default(HybridLogRecoveryInfo);
- checkpointInfo.Recover(cprVersion, new DirectoryConfiguration(test_path));
+ checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path));
// Compute expected array
long[] expected = new long[numUniqueKeys];
diff --git a/cs/test/SharedDirectoryTests.cs b/cs/test/SharedDirectoryTests.cs
index 849956bce..7c4e5c9ee 100644
--- a/cs/test/SharedDirectoryTests.cs
+++ b/cs/test/SharedDirectoryTests.cs
@@ -153,7 +153,7 @@ private void Populate(FasterKV
private void Test(FasterTestInstance fasterInstance, Guid checkpointToken)
{
var checkpointInfo = default(HybridLogRecoveryInfo);
- Assert.IsTrue(checkpointInfo.Recover(checkpointToken, new DirectoryConfiguration(fasterInstance.CheckpointDirectory)));
+ checkpointInfo.Recover(checkpointToken, new LocalCheckpointManager(fasterInstance.CheckpointDirectory));
// Create array for reading
var inputArray = new Input[numUniqueKeys];
diff --git a/docs/Roadmap.md b/docs/Roadmap.md
index 9c415d9df..3ef23a8cc 100644
--- a/docs/Roadmap.md
+++ b/docs/Roadmap.md
@@ -45,6 +45,7 @@ Completed items are included to provide the context and progress of the work.
* [x] Support for runtime shifting of address markers (e.g., begin, head, read-only) for dynamic tuning of memory contents of allocators (log and read cache).
* [x] Log compaction by rolling forward older active keys
* [x] Support for subscribing to the hybrid log (push-based, as record batches become read-only): [PR](https://github.com/Microsoft/FASTER/pull/133)
+* [x] Support for callback when records in hybrid log become read-only: [PR](https://github.com/microsoft/FASTER/pull/133)
#### Checkpoint and Recovery
@@ -57,6 +58,7 @@ Completed items are included to provide the context and progress of the work.
* [ ] Better integration with an async/await threading model in C#: [PR](https://github.com/Microsoft/FASTER/pull/130)
* [ ] Support for cloud storage, starting with Azure Page Blobs: [PR](https://github.com/Microsoft/FASTER/pull/147)
* [ ] Support for tiered storage: [PR](https://github.com/Microsoft/FASTER/pull/151)
+* [ ] Make checkpointing use a pluggable user-specified interface for providing devices and performing metadata commit: [PR](https://github.com/microsoft/FASTER/pull/161)
* [ ] Support for sharded storage
* [ ] Scale-out and elasticity support
* [ ] Checksums for storage pages
@@ -66,6 +68,17 @@ Completed items are included to provide the context and progress of the work.
## Release Notes
+#### FASTER v2019.7.23.1
+
+* [x] Object log recovery bug fix: [PR](https://github.com/microsoft/FASTER/pull/158)
+* [x] Option to enable file buffering for local storage device
+* [x] Optimizing what is loaded to hybrid log memory during recovery (prior head address onwards only)
+* [x] Removing direct call of callback when IO completes synchronously: [PR](https://github.com/microsoft/FASTER/pull/155)
+* [x] Fixed checkpoint recovery bug: [PR](https://github.com/microsoft/FASTER/pull/144)
+* [x] Adding FILE_SHARE_DELETE when deleteOnClose is used: [PR](https://github.com/microsoft/FASTER/pull/134)
+* [x] Support for callback when records in hybrid log become read-only: [PR](https://github.com/microsoft/FASTER/pull/133)
+
+
#### FASTER v2019.4.24.4
* [x] Added support for variable sized (inline) structs without object log: [PR](https://github.com/Microsoft/FASTER/pull/120)
* [x] Removed statics from codebase to better support multiple instances: [PR](https://github.com/Microsoft/FASTER/pull/117)
@@ -111,3 +124,4 @@ guidelines. It supports the following features as of now:
* [x] Log segments on storage, with truncation from head of log
* [x] CPR-based checkpointing and recovery (both snapshot and fold-over modes), see [[here](https://microsoft.github.io/FASTER/#recovery-in-faster)]
* [x] Ability to resize the hash table
+* [x] C++: Added a new `value_size()` method to `RmwContext` for RCU operations: [PR](https://github.com/microsoft/FASTER/pull/145)