Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Checkpoint, Recovery, RecordInfo updates #588

Merged
merged 9 commits into from
Nov 2, 2021
2 changes: 1 addition & 1 deletion cs/benchmark/FasterSpanByteYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys
for (int i = 0; i < 8; i++)
input_[i].value = i;

device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: true);
device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.RecoverMode, useIoCompletionPort: true);

if (testLoader.Options.UseSmallMemoryLog)
store = new FasterKV<SpanByte, SpanByte>
Expand Down
2 changes: 1 addition & 1 deletion cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade
for (int i = 0; i < 8; i++)
input_[i].value = i;

device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: true, useIoCompletionPort: true);
device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.RecoverMode, useIoCompletionPort: true);

if (testLoader.Options.ThreadCount >= 16)
device.ThrottleLimit = testLoader.Options.ThreadCount * 12;
Expand Down
4 changes: 2 additions & 2 deletions cs/benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public class Program

public static void Main(string[] args)
{
TestLoader testLoader = new();
if (!testLoader.Parse(args))
TestLoader testLoader = new(args);
if (testLoader.error)
return;

TestStats testStats = new(testLoader.Options);
Expand Down
49 changes: 25 additions & 24 deletions cs/benchmark/TestLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
using System.Runtime.InteropServices;
using System.Threading;

#pragma warning disable CS0162 // Unreachable code detected -- when switching on YcsbConstants

namespace FASTER.benchmark
{
internal interface IKeySetter<TKey>
Expand All @@ -21,27 +19,29 @@ internal interface IKeySetter<TKey>

class TestLoader
{
internal Options Options;

internal BenchmarkType BenchmarkType;
internal LockImpl LockImpl;
internal string Distribution;

internal readonly Options Options;
internal readonly string Distribution;
internal Key[] init_keys = default;
internal Key[] txn_keys = default;
internal KeySpanByte[] init_span_keys = default;
internal KeySpanByte[] txn_span_keys = default;

internal long InitCount;
internal long TxnCount;
internal int MaxKey;
internal readonly BenchmarkType BenchmarkType;
internal readonly LockImpl LockImpl;
internal readonly long InitCount;
internal readonly long TxnCount;
internal readonly int MaxKey;
internal readonly bool RecoverMode;
internal readonly bool error;

internal bool Parse(string[] args)

internal TestLoader(string[] args)
{
error = true;
ParserResult<Options> result = Parser.Default.ParseArguments<Options>(args);
if (result.Tag == ParserResultType.NotParsed)
{
return false;
return;
}
Options = result.MapResult(o => o, xs => new Options());

Expand All @@ -54,33 +54,34 @@ static bool verifyOption(bool isValid, string name)

this.BenchmarkType = (BenchmarkType)Options.Benchmark;
if (!verifyOption(Enum.IsDefined(typeof(BenchmarkType), this.BenchmarkType), "Benchmark"))
return false;
return;

if (!verifyOption(Options.NumaStyle >= 0 && Options.NumaStyle <= 1, "NumaStyle"))
return false;
return;

this.LockImpl = (LockImpl)Options.LockImpl;
if (!verifyOption(Enum.IsDefined(typeof(LockImpl), this.LockImpl), "Lock Implementation"))
return false;
return;

if (!verifyOption(Options.IterationCount > 0, "Iteration Count"))
return false;
return;

if (!verifyOption(Options.ReadPercent >= -1 && Options.ReadPercent <= 100, "Read Percent"))
return false;
return;

this.Distribution = Options.DistributionName.ToLower();
if (!verifyOption(this.Distribution == YcsbConstants.UniformDist || this.Distribution == YcsbConstants.ZipfDist, "Distribution"))
return false;
return;

if (!verifyOption(this.Options.RunSeconds >= 0, "RunSeconds"))
return false;
return;

this.InitCount = this.Options.UseSmallData ? 2500480 : 250000000;
this.TxnCount = this.Options.UseSmallData ? 10000000 : 1000000000;
this.MaxKey = this.Options.UseSmallData ? 1 << 22 : 1 << 28;
this.RecoverMode = Options.BackupAndRestore && Options.PeriodicCheckpointMilliseconds <= 0;

return true;
error = false;
}

internal void LoadData()
Expand Down Expand Up @@ -335,7 +336,7 @@ private static void LoadSyntheticData<TKey, TKeySetter>(string distribution, uin
internal bool MaybeRecoverStore<K, V>(FasterKV<K, V> store)
{
// Recover database for fast benchmark repeat runs.
if (this.Options.BackupAndRestore && this.Options.PeriodicCheckpointMilliseconds <= 0)
if (RecoverMode)
{
if (this.Options.UseSmallData)
{
Expand Down Expand Up @@ -364,12 +365,12 @@ internal bool MaybeRecoverStore<K, V>(FasterKV<K, V> store)
internal void MaybeCheckpointStore<K, V>(FasterKV<K, V> store)
{
// Checkpoint database for fast benchmark repeat runs.
if (this.Options.BackupAndRestore && this.Options.PeriodicCheckpointMilliseconds <= 0)
if (RecoverMode)
{
Console.WriteLine($"Checkpointing FasterKV to {this.BackupPath} for fast restart");
var sw = Stopwatch.StartNew();
store.TakeFullCheckpoint(out _, CheckpointType.Snapshot);
store.CompleteCheckpointAsync().GetAwaiter().GetResult();
store.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();
sw.Stop();
Console.WriteLine($" Completed checkpoint in {(double)sw.ElapsedMilliseconds / 1000:N3} seconds");
}
Expand Down
4 changes: 2 additions & 2 deletions cs/benchmark/YcsbConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

namespace FASTER.benchmark
{
enum BenchmarkType : int
enum BenchmarkType : byte
{
Ycsb,
SpanByte,
ConcurrentDictionaryYcsb
};

enum LockImpl : int
enum LockImpl : byte
{
None,
RecordInfo
Expand Down
14 changes: 5 additions & 9 deletions cs/samples/ReadAddress/VersionedReadApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private async static Task PopulateStore(FasterKV<Key, Value> store)
using var s = store.For(new Functions()).NewSession<Functions>();
Console.WriteLine($"Writing {numKeys} keys to FASTER", numKeys);

Stopwatch sw = new Stopwatch();
Stopwatch sw = new();
sw.Start();
var prevLap = 0;
for (int ii = 0; ii < numKeys; ii++)
Expand Down Expand Up @@ -152,7 +152,6 @@ private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
var input = default(Value);
var key = new Key(keyValue);
RecordMetadata recordMetadata = default;
int version = int.MaxValue;
for (int lap = 9; /* tested in loop */; --lap)
{
var status = session.Read(ref key, ref input, ref output, ref recordMetadata, serialNo: maxLap + 1);
Expand All @@ -169,7 +168,7 @@ private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
}
}

if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output, ref version))
if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output))
break;
}
}
Expand All @@ -184,27 +183,24 @@ private static async Task ScanStoreAsync(FasterKV<Key, Value> store, int keyValu
var input = default(Value);
var key = new Key(keyValue);
RecordMetadata recordMetadata = default;
int version = int.MaxValue;
for (int lap = 9; /* tested in loop */; --lap)
{
var readAsyncResult = await session.ReadAsync(ref key, ref input, recordMetadata.RecordInfo.PreviousAddress, default, serialNo: maxLap + 1, cancellationToken: cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
var (status, output) = readAsyncResult.Complete(out recordMetadata);
if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output, ref version))
if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output))
break;
}
}

private static bool ProcessRecord(FasterKV<Key, Value> store, Status status, RecordInfo recordInfo, int lap, ref Value output, ref int previousVersion)
private static bool ProcessRecord(FasterKV<Key, Value> store, Status status, RecordInfo recordInfo, int lap, ref Value output)
{
Debug.Assert((status == Status.NOTFOUND) == recordInfo.Tombstone);
Debug.Assert((lap == deleteLap) == recordInfo.Tombstone);
var value = recordInfo.Tombstone ? "<deleted>" : output.value.ToString();
Debug.Assert(previousVersion >= recordInfo.Version);
Console.WriteLine($" {value}; Version = {recordInfo.Version}; PrevAddress: {recordInfo.PreviousAddress}");
Console.WriteLine($" {value}; PrevAddress: {recordInfo.PreviousAddress}");

// Check for end of loop
previousVersion = recordInfo.Version;
return recordInfo.PreviousAddress >= store.Log.BeginAddress;
}
}
Expand Down
77 changes: 65 additions & 12 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal struct FullPageStatus
[FieldOffset(8)]
public long LastClosedUntilAddress;
[FieldOffset(16)]
public int Dirty;
public long Dirty;
}

[StructLayout(LayoutKind.Explicit)]
Expand Down Expand Up @@ -413,7 +413,7 @@ private protected void VerifyCompatibleSectorSize(IDevice device)
/// <param name="prevEndAddress"></param>
/// <param name="version"></param>
/// <param name="deltaLog"></param>
internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog)
internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog)
{
long startPage = GetPage(startAddress);
long endPage = GetPage(endAddress);
Expand Down Expand Up @@ -446,8 +446,9 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end
{
ref var info = ref GetInfo(physicalAddress);
var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
if (info.Version == RecordInfo.GetShortVersion(version))
if (info.Dirty)
{
info.DirtyAtomic = false; // there may be read locks being taken, hence atomic
int size = sizeof(long) + sizeof(int) + alignedRecordSize;
if (destOffset + size > entryLength)
{
Expand All @@ -457,9 +458,9 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end
if (destOffset + size > entryLength)
throw new FasterException("Insufficient page size to write delta");
}
*((long*)(destPhysicalAddress + destOffset)) = logicalAddress;
*(long*)(destPhysicalAddress + destOffset) = logicalAddress;
destOffset += sizeof(long);
*((int*)(destPhysicalAddress + destOffset)) = alignedRecordSize;
*(int*)(destPhysicalAddress + destOffset) = alignedRecordSize;
destOffset += sizeof(int);
Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize);
destOffset += alignedRecordSize;
Expand Down Expand Up @@ -529,15 +530,15 @@ internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage, long
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void MarkPage(long logicalAddress, int version)
internal void MarkPage(long logicalAddress, long version)
{
var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
if (PageStatusIndicator[offset].Dirty < version)
PageStatusIndicator[offset].Dirty = version;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void MarkPageAtomic(long logicalAddress, int version)
internal void MarkPageAtomic(long logicalAddress, long version)
{
var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
Utility.MonotonicUpdate(ref PageStatusIndicator[offset].Dirty, version, out _);
Expand Down Expand Up @@ -1810,7 +1811,7 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica
{
int totalNumPages = (int)(endPage - startPage);
completedSemaphore = new SemaphoreSlim(0);
var asyncResult = new PageAsyncFlushResult<Empty>
var flushCompletionTracker = new FlushCompletionTracker
{
completedSemaphore = completedSemaphore,
count = totalNumPages
Expand All @@ -1819,11 +1820,18 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica

for (long flushPage = startPage; flushPage < endPage; flushPage++)
{

long flushPageAddress = flushPage << LogPageSizeBits;
var pageSize = PageSize;

if (flushPage == endPage - 1)
pageSize = (int)(endLogicalAddress - (flushPage << LogPageSizeBits));
pageSize = (int)(endLogicalAddress - flushPageAddress);

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushPageAddress,
untilAddress = flushPageAddress + pageSize,
};

// Intended destination is flushPage
WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets);
Expand Down Expand Up @@ -1977,7 +1985,52 @@ protected void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, obj
}

PageAsyncFlushResult<Empty> result = (PageAsyncFlushResult<Empty>)context;
if (Interlocked.Decrement(ref result.count) == 0)

// Unset dirty bit for flushed pages
bool epochTaken = false;
if (!epoch.ThisInstanceProtected())
{
epochTaken = true;
epoch.Resume();
}

try
{
var startAddress = result.page << LogPageSizeBits;
var endAddress = startAddress + PageSize;

if (result.fromAddress > startAddress)
startAddress = result.fromAddress;
var _readOnlyAddress = SafeReadOnlyAddress;
if (_readOnlyAddress > startAddress)
startAddress = _readOnlyAddress;

if (result.untilAddress < endAddress)
endAddress = result.untilAddress;
int flushWidth = (int)(endAddress - startAddress);

if (flushWidth > 0)
{
var physicalAddress = GetPhysicalAddress(startAddress);
var endPhysicalAddress = physicalAddress + flushWidth;

while (physicalAddress < endPhysicalAddress)
{
ref var info = ref GetInfo(physicalAddress);
var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
if (info.Dirty)
info.DirtyAtomic = false; // there may be read locks being taken, hence atomic
physicalAddress += alignedRecordSize;
}
}
}
finally
{
if (epochTaken)
epoch.Suspend();
}

if (Interlocked.Decrement(ref result.flushCompletionTracker.count) == 0)
{
result.Free();
}
Expand Down
Loading