Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Checkpoint, Recovery, RecordInfo updates #588

Merged
merged 9 commits into from
Nov 2, 2021
2 changes: 1 addition & 1 deletion cs/benchmark/FasterSpanByteYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys
for (int i = 0; i < 8; i++)
input_[i].value = i;

device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: true);
device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.CheckpointRecoverStore, useIoCompletionPort: true);

if (testLoader.Options.UseSmallMemoryLog)
store = new FasterKV<SpanByte, SpanByte>
Expand Down
2 changes: 1 addition & 1 deletion cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade
for (int i = 0; i < 8; i++)
input_[i].value = i;

device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: true, useIoCompletionPort: true);
device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.CheckpointRecoverStore, useIoCompletionPort: true);

if (testLoader.Options.ThreadCount >= 16)
device.ThrottleLimit = testLoader.Options.ThreadCount * 12;
Expand Down
12 changes: 7 additions & 5 deletions cs/benchmark/TestLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
using System.Runtime.InteropServices;
using System.Threading;

#pragma warning disable CS0162 // Unreachable code detected -- when switching on YcsbConstants

namespace FASTER.benchmark
{
internal interface IKeySetter<TKey>
Expand Down Expand Up @@ -332,10 +330,14 @@ private static void LoadSyntheticData<TKey, TKeySetter>(string distribution, uin

internal string BackupPath => $"{DataPath}/{this.Distribution}_{(this.Options.UseSyntheticData ? "synthetic" : "ycsb")}_{(this.Options.UseSmallData ? "2.5M_10M" : "250M_1000M")}";


internal bool CheckpointRecoverStore
=> Options.BackupAndRestore && Options.PeriodicCheckpointMilliseconds <= 0;

internal bool MaybeRecoverStore<K, V>(FasterKV<K, V> store)
{
// Recover database for fast benchmark repeat runs.
if (this.Options.BackupAndRestore && this.Options.PeriodicCheckpointMilliseconds <= 0)
if (CheckpointRecoverStore)
{
if (this.Options.UseSmallData)
{
Expand Down Expand Up @@ -364,12 +366,12 @@ internal bool MaybeRecoverStore<K, V>(FasterKV<K, V> store)
internal void MaybeCheckpointStore<K, V>(FasterKV<K, V> store)
{
// Checkpoint database for fast benchmark repeat runs.
if (this.Options.BackupAndRestore && this.Options.PeriodicCheckpointMilliseconds <= 0)
if (CheckpointRecoverStore)
{
Console.WriteLine($"Checkpointing FasterKV to {this.BackupPath} for fast restart");
var sw = Stopwatch.StartNew();
store.TakeFullCheckpoint(out _, CheckpointType.Snapshot);
store.CompleteCheckpointAsync().GetAwaiter().GetResult();
store.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();
sw.Stop();
Console.WriteLine($" Completed checkpoint in {(double)sw.ElapsedMilliseconds / 1000:N3} seconds");
}
Expand Down
14 changes: 5 additions & 9 deletions cs/samples/ReadAddress/VersionedReadApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private async static Task PopulateStore(FasterKV<Key, Value> store)
using var s = store.For(new Functions()).NewSession<Functions>();
Console.WriteLine($"Writing {numKeys} keys to FASTER", numKeys);

Stopwatch sw = new Stopwatch();
Stopwatch sw = new();
sw.Start();
var prevLap = 0;
for (int ii = 0; ii < numKeys; ii++)
Expand Down Expand Up @@ -152,7 +152,6 @@ private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
var input = default(Value);
var key = new Key(keyValue);
RecordMetadata recordMetadata = default;
int version = int.MaxValue;
for (int lap = 9; /* tested in loop */; --lap)
{
var status = session.Read(ref key, ref input, ref output, ref recordMetadata, serialNo: maxLap + 1);
Expand All @@ -169,7 +168,7 @@ private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
}
}

if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output, ref version))
if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output))
break;
}
}
Expand All @@ -184,27 +183,24 @@ private static async Task ScanStoreAsync(FasterKV<Key, Value> store, int keyValu
var input = default(Value);
var key = new Key(keyValue);
RecordMetadata recordMetadata = default;
int version = int.MaxValue;
for (int lap = 9; /* tested in loop */; --lap)
{
var readAsyncResult = await session.ReadAsync(ref key, ref input, recordMetadata.RecordInfo.PreviousAddress, default, serialNo: maxLap + 1, cancellationToken: cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
var (status, output) = readAsyncResult.Complete(out recordMetadata);
if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output, ref version))
if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output))
break;
}
}

private static bool ProcessRecord(FasterKV<Key, Value> store, Status status, RecordInfo recordInfo, int lap, ref Value output, ref int previousVersion)
private static bool ProcessRecord(FasterKV<Key, Value> store, Status status, RecordInfo recordInfo, int lap, ref Value output)
{
Debug.Assert((status == Status.NOTFOUND) == recordInfo.Tombstone);
Debug.Assert((lap == deleteLap) == recordInfo.Tombstone);
var value = recordInfo.Tombstone ? "<deleted>" : output.value.ToString();
Debug.Assert(previousVersion >= recordInfo.Version);
Console.WriteLine($" {value}; Version = {recordInfo.Version}; PrevAddress: {recordInfo.PreviousAddress}");
Console.WriteLine($" {value}; PrevAddress: {recordInfo.PreviousAddress}");

// Check for end of loop
previousVersion = recordInfo.Version;
return recordInfo.PreviousAddress >= store.Log.BeginAddress;
}
}
Expand Down
77 changes: 65 additions & 12 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal struct FullPageStatus
[FieldOffset(8)]
public long LastClosedUntilAddress;
[FieldOffset(16)]
public int Dirty;
public long Dirty;
}

[StructLayout(LayoutKind.Explicit)]
Expand Down Expand Up @@ -413,7 +413,7 @@ private protected void VerifyCompatibleSectorSize(IDevice device)
/// <param name="prevEndAddress"></param>
/// <param name="version"></param>
/// <param name="deltaLog"></param>
internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog)
internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog)
{
long startPage = GetPage(startAddress);
long endPage = GetPage(endAddress);
Expand Down Expand Up @@ -446,8 +446,9 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end
{
ref var info = ref GetInfo(physicalAddress);
var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
if (info.Version == RecordInfo.GetShortVersion(version))
if (info.Dirty)
{
info.DirtyAtomic = false; // there may be read locks being taken, hence atomic
int size = sizeof(long) + sizeof(int) + alignedRecordSize;
if (destOffset + size > entryLength)
{
Expand All @@ -457,9 +458,9 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end
if (destOffset + size > entryLength)
throw new FasterException("Insufficient page size to write delta");
}
*((long*)(destPhysicalAddress + destOffset)) = logicalAddress;
*(long*)(destPhysicalAddress + destOffset) = logicalAddress;
destOffset += sizeof(long);
*((int*)(destPhysicalAddress + destOffset)) = alignedRecordSize;
*(int*)(destPhysicalAddress + destOffset) = alignedRecordSize;
destOffset += sizeof(int);
Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize);
destOffset += alignedRecordSize;
Expand Down Expand Up @@ -529,15 +530,15 @@ internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage, long
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void MarkPage(long logicalAddress, int version)
internal void MarkPage(long logicalAddress, long version)
{
var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
if (PageStatusIndicator[offset].Dirty < version)
PageStatusIndicator[offset].Dirty = version;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void MarkPageAtomic(long logicalAddress, int version)
internal void MarkPageAtomic(long logicalAddress, long version)
{
var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
Utility.MonotonicUpdate(ref PageStatusIndicator[offset].Dirty, version, out _);
Expand Down Expand Up @@ -1810,7 +1811,7 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica
{
int totalNumPages = (int)(endPage - startPage);
completedSemaphore = new SemaphoreSlim(0);
var asyncResult = new PageAsyncFlushResult<Empty>
var flushCompletionTracker = new FlushCompletionTracker
{
completedSemaphore = completedSemaphore,
count = totalNumPages
Expand All @@ -1819,11 +1820,18 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica

for (long flushPage = startPage; flushPage < endPage; flushPage++)
{

long flushPageAddress = flushPage << LogPageSizeBits;
var pageSize = PageSize;

if (flushPage == endPage - 1)
pageSize = (int)(endLogicalAddress - (flushPage << LogPageSizeBits));
pageSize = (int)(endLogicalAddress - flushPageAddress);

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushPageAddress,
untilAddress = flushPageAddress + pageSize,
};

// Intended destination is flushPage
WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets);
Expand Down Expand Up @@ -1977,7 +1985,52 @@ protected void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, obj
}

PageAsyncFlushResult<Empty> result = (PageAsyncFlushResult<Empty>)context;
if (Interlocked.Decrement(ref result.count) == 0)

// Unset dirty bit for flushed pages
bool epochTaken = false;
if (!epoch.ThisInstanceProtected())
{
epochTaken = true;
epoch.Resume();
}

try
{
var startAddress = result.page << LogPageSizeBits;
var endAddress = startAddress + PageSize;

if (result.fromAddress > startAddress)
startAddress = result.fromAddress;
var _readOnlyAddress = SafeReadOnlyAddress;
if (_readOnlyAddress > startAddress)
startAddress = _readOnlyAddress;

if (result.untilAddress < endAddress)
endAddress = result.untilAddress;
int flushWidth = (int)(endAddress - startAddress);

if (flushWidth > 0)
{
var physicalAddress = GetPhysicalAddress(startAddress);
var endPhysicalAddress = physicalAddress + flushWidth;

while (physicalAddress < endPhysicalAddress)
{
ref var info = ref GetInfo(physicalAddress);
var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
if (info.Dirty)
info.DirtyAtomic = false; // there may be read locks being taken, hence atomic
physicalAddress += alignedRecordSize;
}
}
}
finally
{
if (epochTaken)
epoch.Suspend();
}

if (Interlocked.Decrement(ref result.flushCompletionTracker.count) == 0)
{
result.Free();
}
Expand Down
9 changes: 6 additions & 3 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres

var _objBuffer = bufferPool.Get(memoryStreamLength);

asyncResult.done = new AutoResetEvent(false);

var _alignedLength = (memoryStreamLength + (sectorSize - 1)) & ~(sectorSize - 1);

var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength;
Expand Down Expand Up @@ -479,6 +477,8 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
// Reset address list for next chunk
addr = new List<long>();

asyncResult.done = new AutoResetEvent(false);

objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
(int)(alignedDestinationAddress >> LogSegmentSizeBits),
Expand All @@ -493,6 +493,9 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedDestinationAddres
// need to write both page and object cache
Interlocked.Increment(ref asyncResult.count);

if (asyncResult.flushCompletionTracker != null)
Interlocked.Increment(ref asyncResult.flushCompletionTracker.count);

asyncResult.freeBuffer2 = _objBuffer;
objlogDevice.WriteAsync(
(IntPtr)_objBuffer.aligned_pointer,
Expand Down Expand Up @@ -1049,7 +1052,7 @@ internal override void MemoryPageScan(long beginAddress, long endAddress)
}
}

internal override void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog)
internal override void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog)
{
throw new FasterException("Incremental snapshots not supported with generic allocator");
}
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Async/CompletePendingAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal async ValueTask ReadyToCompletePendingAsync<Input, Output, Context>(Fas
#region Previous pending requests
if (!RelaxedCPR)
{
if (currentCtx.phase == Phase.IN_PROGRESS || currentCtx.phase == Phase.WAIT_PENDING)
if (currentCtx.phase == Phase.IN_PROGRESS)
{
if (currentCtx.prevCtx.SyncIoPendingCount != 0)
await currentCtx.prevCtx.readyResponses.WaitForEntryAsync(token).ConfigureAwait(false);
Expand Down Expand Up @@ -52,7 +52,7 @@ internal async ValueTask CompletePendingAsync<Input, Output, Context>(IFasterSes
#region Previous pending requests
if (!RelaxedCPR)
{
if (currentCtx.phase == Phase.IN_PROGRESS || currentCtx.phase == Phase.WAIT_PENDING)
if (currentCtx.phase == Phase.IN_PROGRESS)
{
fasterSession.UnsafeResumeThread();
try
Expand Down
Loading