Skip to content

Commit

Permalink
[C#] Support index checkpointing with read cache enabled (#382)
Browse files Browse the repository at this point in the history
* [C#] Support index checkpointing with read cache enabled

* Write index in 32MB chunks when read cache is enabled

* Add check to ensure LocalStorageDevice is not used from non-Windows OS platform.

* Updated testcases to not print anything to stdout, sped up some tests
  • Loading branch information
badrishc authored Dec 2, 2020
1 parent 32b226d commit 7021cd9
Show file tree
Hide file tree
Showing 20 changed files with 274 additions and 86 deletions.
65 changes: 50 additions & 15 deletions cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,17 +450,6 @@ public void Dispose()

#region Checkpoint

/// <summary>
/// Public facing persistence API
/// </summary>
/// <param name="device"></param>
/// <param name="start_offset"></param>
/// <param name="numBytes"></param>
public void TakeCheckpoint(IDevice device, ulong start_offset, out ulong numBytes)
{
BeginCheckpoint(device, start_offset, out numBytes);
}

/// <summary>
/// Is checkpoint complete
/// </summary>
Expand All @@ -481,7 +470,25 @@ public async ValueTask IsCheckpointCompletedAsync(CancellationToken token = defa
s.Release();
}

internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong numBytesWritten)
/// <summary>
/// Public facing persistence API
/// </summary>
/// <param name="device"></param>
/// <param name="offset"></param>
/// <param name="numBytesWritten"></param>
public void BeginCheckpoint(IDevice device, ulong offset, out ulong numBytesWritten)
=> BeginCheckpoint(device, offset, out numBytesWritten, false, default, default);

/// <summary>
/// Internal persistence API
/// </summary>
/// <param name="device"></param>
/// <param name="offset"></param>
/// <param name="numBytesWritten"></param>
/// <param name="useReadCache"></param>
/// <param name="skipReadCache"></param>
/// <param name="epoch"></param>
internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong numBytesWritten, bool useReadCache, SkipReadCache skipReadCache, LightEpoch epoch)
{
int localCount = count;
int recordsCountInLastLevel = localCount & PageSizeMask;
Expand All @@ -492,15 +499,40 @@ internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong num
uint alignedPageSize = PageSize * (uint)RecordSize;
uint lastLevelSize = (uint)recordsCountInLastLevel * (uint)RecordSize;


int sectorSize = (int)device.SectorSize;
numBytesWritten = 0;
for (int i = 0; i < numLevels; i++)
{
OverflowPagesFlushAsyncResult result = default;

uint writeSize = (uint)((i == numCompleteLevels) ? (lastLevelSize + (sectorSize - 1)) & ~(sectorSize - 1) : alignedPageSize);

device.WriteAsync(pointers[i], offset + numBytesWritten, writeSize, AsyncFlushCallback, result);
if (!useReadCache)
{
device.WriteAsync(pointers[i], offset + numBytesWritten, writeSize, AsyncFlushCallback, result);
}
else
{
result.mem = new SectorAlignedMemory((int)writeSize, (int)device.SectorSize);
bool prot = false;
if (!epoch.ThisInstanceProtected())
{
prot = true;
epoch.Resume();
}

Buffer.MemoryCopy((void*)pointers[i], result.mem.aligned_pointer, writeSize, writeSize);
int j = 0;
if (i == 0) j += kAllocateChunkSize*RecordSize;
for (; j < writeSize; j += sizeof(HashBucket))
{
skipReadCache((HashBucket*)(result.mem.aligned_pointer + j));
}

if (prot) epoch.Suspend();

device.WriteAsync((IntPtr)result.mem.aligned_pointer, offset + numBytesWritten, writeSize, AsyncFlushCallback, result);
}
numBytesWritten += writeSize;
}
}
Expand All @@ -509,9 +541,12 @@ private unsafe void AsyncFlushCallback(uint errorCode, uint numBytes, object con
{
if (errorCode != 0)
{
System.Diagnostics.Trace.TraceError("AsyncFlushCallback error: {0}", errorCode);
Trace.TraceError("AsyncFlushCallback error: {0}", errorCode);
}

var mem = ((OverflowPagesFlushAsyncResult)context).mem;
mem?.Dispose();

if (Interlocked.Decrement(ref checkpointCallbackCount) == 0)
{
checkpointSemaphore.Release();
Expand Down
7 changes: 7 additions & 0 deletions cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ protected internal LocalStorageDevice(string filename,
IEnumerable<KeyValuePair<int, SafeFileHandle>> initialLogFileHandles = null)
: base(filename, GetSectorSize(filename), capacity)
{
#if NETSTANDARD
if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
throw new FasterException("Cannot use LocalStorageDevice from non-Windows OS platform, use ManagedLocalStorageDevice instead.");
}
#endif

if (UsePrivileges && preallocateFile)
Native32.EnableProcessPrivileges();

Expand Down
22 changes: 22 additions & 0 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,28 @@ private void SkipReadCache(ref long logicalAddress)
}
}

private void SkipReadCacheBucket(HashBucket* bucket)
{
for (int index = 0; index < Constants.kOverflowBucketIndex; ++index)
{
HashBucketEntry* entry = (HashBucketEntry*)&bucket->bucket_entries[index];
if (0 == entry->word)
continue;

if (!entry->ReadCache) continue;
var logicalAddress = entry->Address;
var physicalAddress = readcache.GetPhysicalAddress(logicalAddress & ~Constants.kReadCacheBitMask);

while (true)
{
logicalAddress = readcache.GetInfo(physicalAddress).PreviousAddress;
entry->Address = logicalAddress;
if (!entry->ReadCache) break;
physicalAddress = readcache.GetPhysicalAddress(logicalAddress & ~Constants.kReadCacheBitMask);
}
}
}

private void SkipAndInvalidateReadCache(ref long logicalAddress, ref Key key)
{
HashBucketEntry entry = default;
Expand Down
67 changes: 44 additions & 23 deletions cs/src/core/Index/Recovery/IndexCheckpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,37 @@

namespace FASTER.core
{
public partial class FasterBase
internal unsafe delegate void SkipReadCache(HashBucket* bucket);

public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{
// Derived class facing persistence API
internal IndexCheckpointInfo _indexCheckpoint;

internal void TakeIndexFuzzyCheckpoint()
internal unsafe void TakeIndexFuzzyCheckpoint()
{
var ht_version = resizeInfo.version;

TakeMainIndexCheckpoint(ht_version,
_indexCheckpoint.main_ht_device,
out ulong ht_num_bytes_written);
BeginMainIndexCheckpoint(ht_version, _indexCheckpoint.main_ht_device, out ulong ht_num_bytes_written, UseReadCache, SkipReadCacheBucket);

var sectorSize = _indexCheckpoint.main_ht_device.SectorSize;
var alignedIndexSize = (uint)((ht_num_bytes_written + (sectorSize - 1)) & ~(sectorSize - 1));
overflowBucketsAllocator.TakeCheckpoint(_indexCheckpoint.main_ht_device, alignedIndexSize, out ulong ofb_num_bytes_written);
overflowBucketsAllocator.BeginCheckpoint(_indexCheckpoint.main_ht_device, alignedIndexSize, out ulong ofb_num_bytes_written, UseReadCache, SkipReadCacheBucket, epoch);
_indexCheckpoint.info.num_ht_bytes = ht_num_bytes_written;
_indexCheckpoint.info.num_ofb_bytes = ofb_num_bytes_written;
}
}

public partial class FasterBase
{
internal void TakeIndexFuzzyCheckpoint(int ht_version, IDevice device,
out ulong numBytesWritten, IDevice ofbdevice,
out ulong ofbnumBytesWritten, out int num_ofb_buckets)
{
TakeMainIndexCheckpoint(ht_version, device, out numBytesWritten);
BeginMainIndexCheckpoint(ht_version, device, out numBytesWritten);
var sectorSize = device.SectorSize;
var alignedIndexSize = (uint)((numBytesWritten + (sectorSize - 1)) & ~(sectorSize - 1));
overflowBucketsAllocator.TakeCheckpoint(ofbdevice, alignedIndexSize, out ofbnumBytesWritten);
overflowBucketsAllocator.BeginCheckpoint(ofbdevice, alignedIndexSize, out ofbnumBytesWritten);
num_ofb_buckets = overflowBucketsAllocator.GetMaxValidAddress();
}

Expand All @@ -61,22 +64,17 @@ internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken
private int mainIndexCheckpointCallbackCount;
private SemaphoreSlim mainIndexCheckpointSemaphore;

private void TakeMainIndexCheckpoint(int tableVersion,
IDevice device,
out ulong numBytes)
{
BeginMainIndexCheckpoint(tableVersion, device, out numBytes);
}

private unsafe void BeginMainIndexCheckpoint(
int version,
IDevice device,
out ulong numBytesWritten)
internal unsafe void BeginMainIndexCheckpoint(int version, IDevice device, out ulong numBytesWritten, bool useReadCache = false, SkipReadCache skipReadCache = default)
{
long totalSize = state[version].size * sizeof(HashBucket);

int numChunks = 1;
if (totalSize > uint.MaxValue)
if (useReadCache && (totalSize > (1L << 25)))
{
numChunks = (int)Math.Ceiling((double)totalSize / (1L << 25));
numChunks = (int)Math.Pow(2, Math.Ceiling(Math.Log(numChunks, 2)));
}
else if (totalSize > uint.MaxValue)
{
numChunks = (int)Math.Ceiling((double)totalSize / (long)uint.MaxValue);
numChunks = (int)Math.Pow(2, Math.Ceiling(Math.Log(numChunks, 2)));
Expand All @@ -90,10 +88,32 @@ private unsafe void BeginMainIndexCheckpoint(
numBytesWritten = 0;
for (int index = 0; index < numChunks; index++)
{
long chunkStartBucket = (long)start + (index * chunkSize);
IntPtr chunkStartBucket = (IntPtr)((byte*)start + (index * chunkSize));
HashIndexPageAsyncFlushResult result = default;
result.chunkIndex = index;
device.WriteAsync((IntPtr)chunkStartBucket, numBytesWritten, chunkSize, AsyncPageFlushCallback, result);
if (!useReadCache)
{
device.WriteAsync(chunkStartBucket, numBytesWritten, chunkSize, AsyncPageFlushCallback, result);
}
else
{
result.mem = new SectorAlignedMemory((int)chunkSize, (int)device.SectorSize);
bool prot = false;
if (!epoch.ThisInstanceProtected())
{
prot = true;
epoch.Resume();
}
Buffer.MemoryCopy((void*)chunkStartBucket, result.mem.aligned_pointer, chunkSize, chunkSize);
for (int j = 0; j < chunkSize; j += sizeof(HashBucket))
{
skipReadCache((HashBucket*)(result.mem.aligned_pointer + j));
}
if (prot)
epoch.Suspend();

device.WriteAsync((IntPtr)result.mem.aligned_pointer, numBytesWritten, chunkSize, AsyncPageFlushCallback, result);
}
numBytesWritten += chunkSize;
}
}
Expand All @@ -113,7 +133,8 @@ private async ValueTask IsMainIndexCheckpointCompletedAsync(CancellationToken to
private unsafe void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context)
{
// Set the page status to flushed
_ = (HashIndexPageAsyncFlushResult)context;
var mem = ((HashIndexPageAsyncFlushResult)context).mem;
mem?.Dispose();

if (errorCode != 0)
{
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Recovery/IndexRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ private unsafe void BeginMainIndexRecovery(
ulong numBytesRead = 0;
for (int index = 0; index < numChunks; index++)
{
long chunkStartBucket = (long)start + (index * chunkSize);
IntPtr chunkStartBucket = (IntPtr)(((byte*)start) + (index * chunkSize));
HashIndexPageAsyncReadResult result = default;
result.chunkIndex = index;
device.ReadAsync(numBytesRead, (IntPtr)chunkStartBucket, chunkSize, AsyncPageReadCallback, result);
device.ReadAsync(numBytesRead, chunkStartBucket, chunkSize, AsyncPageReadCallback, result);
numBytesRead += chunkSize;
}
Debug.Assert(numBytesRead == num_bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public void GlobalBeforeEnteringState<Key, Value>(
}

faster.ObtainCurrentTailAddress(ref faster._indexCheckpoint.info.startLogicalAddress);
if (faster.UseReadCache && faster.ReadCache.BeginAddress != faster.ReadCache.TailAddress)
throw new FasterException("Index checkpoint with read cache is not supported");
faster.TakeIndexFuzzyCheckpoint();
break;

Expand Down
4 changes: 3 additions & 1 deletion cs/src/core/Utilities/AsyncResultTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ internal struct AsyncGetFromDiskResult<TContext>
internal unsafe struct HashIndexPageAsyncFlushResult
{
public int chunkIndex;
}
public SectorAlignedMemory mem;
}

internal unsafe struct HashIndexPageAsyncReadResult
{
Expand All @@ -25,6 +26,7 @@ internal unsafe struct HashIndexPageAsyncReadResult

internal struct OverflowPagesFlushAsyncResult
{
public SectorAlignedMemory mem;
}

internal struct OverflowPagesReadAsyncResult
Expand Down
30 changes: 30 additions & 0 deletions cs/src/core/Utilities/BufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,36 @@ public unsafe sealed class SectorAlignedMemory
internal int level;
internal SectorAlignedBufferPool pool;

/// <summary>
/// Default constructor
/// </summary>
public SectorAlignedMemory() { }

/// <summary>
/// Create new instance of SectorAlignedMemory
/// </summary>
/// <param name="numRecords"></param>
/// <param name="sectorSize"></param>
public SectorAlignedMemory(int numRecords, int sectorSize)
{
int recordSize = 1;
int requiredSize = sectorSize + (((numRecords) * recordSize + (sectorSize - 1)) & ~(sectorSize - 1));

buffer = new byte[requiredSize];
handle = GCHandle.Alloc(buffer, GCHandleType.Pinned);
aligned_pointer = (byte*)(((long)handle.AddrOfPinnedObject() + (sectorSize - 1)) & ~(sectorSize - 1));
offset = (int)((long)aligned_pointer - (long)handle.AddrOfPinnedObject());
}

/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
handle.Free();
buffer = null;
}

/// <summary>
/// Return
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion cs/test/AsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ public void DeleteCompletionCallback(ref AdId key, Empty ctx)

public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
{
Console.WriteLine("Session {0} reports persistence until {1}", sessionId, commitPoint.UntilSerialNo);
}

// Read functions
Expand Down
4 changes: 2 additions & 2 deletions cs/test/BasicDiskFASTERTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void TestDeviceWriteRead(IDevice log)

InputStruct input = default;

for (int i = 0; i < 2000; i++)
for (int i = 0; i < 700; i++)
{
var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 };
var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 };
Expand All @@ -92,7 +92,7 @@ void TestDeviceWriteRead(IDevice log)
}


for (int i = 0; i < 2000; i++)
for (int i = 0; i < 700; i++)
{
OutputStruct output = default;
var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 };
Expand Down
2 changes: 1 addition & 1 deletion cs/test/ObjectRecoveryTest2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void DeleteDirectory(string path)
[Test]
public async ValueTask ObjectRecoveryTest2(
[Values]CheckpointType checkpointType,
[Range(100, 1500, 600)] int iterations,
[Range(100, 700, 300)] int iterations,
[Values]bool isAsync)
{
this.iterations = iterations;
Expand Down
2 changes: 1 addition & 1 deletion cs/test/ObjectRecoveryTest3.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void DeleteDirectory(string path)
[Test]
public async ValueTask ObjectRecoveryTest3(
[Values]CheckpointType checkpointType,
[Values(2000)] int iterations,
[Values(1000)] int iterations,
[Values]bool isAsync)
{
this.iterations = iterations;
Expand Down
1 change: 0 additions & 1 deletion cs/test/ObjectRecoveryTestTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public void DeleteCompletionCallback(ref AdId key, Empty ctx)

public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
{
Console.WriteLine("Session {0} reports persistence until {1}", sessionId, commitPoint.UntilSerialNo);
}

// Read functions
Expand Down
Loading

0 comments on commit 7021cd9

Please sign in to comment.