Skip to content

Commit

Permalink
[C#] Extension to allow Read of all versions of key in FasterKV log (#…
Browse files Browse the repository at this point in the history
…347)

* Remove obsolete/duplicate ClientSession.ReadAsyncResult
* Improve ReadAddress App, add ReadAddressTests
* Move RecordInfo to PendingContext only; fix TakeFullCheckpoint calls in test/sample
* Fix to CPR handling for Read(.., startAddress, ..)
* Mark old FunctionsBase.ReadCompletionCallback(...) form obsolete and have the new form call the old one so existing implementations continue to work
* Rename ReadAddress sample to VersionedRead
* Merge IAdvancedFunctions (changes to ClientSession.cs and FASTERThread.cs were already there)
* Merge startAddress and RecordInfo args on Read() overload
* Implement ReadAtAddress (pass address instead of key)
* Full IAdvancedFunctions implementation (with ReadCompletionCallback(...., RecordInfo)
* Narrow NewSession API
- Make FasterKV methods taking Functions type parameter internal; FasterKV.For(f) should be used instead, to get IFunctions vs. IAdvancedFunctions overloading
- Remove parameterless FasterKV.For<>()
* Make IClientSession and non-Advanced ClientSession Read(..., ref RecordInfo) operations DEBUG-only
* ReadAtAddress-related fixes
* Add logicalAddresses to IAdvancedFunctions
* Add some Checkpoint methods to IFasterKV; add RecordAccessor member to FasterKV; add comments
* Add more ReadAtAddress/NoKey UTs; tighten up startAddress handling in InternalRead
* Add ReadFlags to the new Read-by-address overloads; move readcache check to internal
* Remove an obsolete line
* Fix ReadAddressTests to use / rather than \\ for Linux compat

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TedHartMS and badrishc authored Dec 1, 2020
1 parent 1c1e1a6 commit 5281504
Show file tree
Hide file tree
Showing 27 changed files with 3,111 additions and 395 deletions.
11 changes: 11 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureBackedStore", "samples
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SecondaryReaderStore", "samples\SecondaryReaderStore\SecondaryReaderStore.csproj", "{EBE313E5-22D2-4C74-BA1F-16B60404B335}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VersionedRead", "samples\ReadAddress\VersionedRead.csproj", "{33ED9E1B-1EF0-4984-A07A-7A26C205A446}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -215,6 +217,14 @@ Global
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|Any CPU.Build.0 = Release|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|x64.ActiveCfg = Release|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|x64.Build.0 = Release|x64
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Debug|x64.ActiveCfg = Debug|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Debug|x64.Build.0 = Debug|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|Any CPU.ActiveCfg = Release|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|Any CPU.Build.0 = Release|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|x64.ActiveCfg = Release|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -241,6 +251,7 @@ Global
{F6EA46D5-DD66-47F2-8FAC-370FDD733DD3} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{EBE313E5-22D2-4C74-BA1F-16B60404B335} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{33ED9E1B-1EF0-4984-A07A-7A26C205A446} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
2 changes: 1 addition & 1 deletion cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;

Expand Down Expand Up @@ -254,6 +253,7 @@ public unsafe void Run()
var storeWasRecovered = false;
if (this.backupMode.HasFlag(BackupMode.Restore) && kPeriodicCheckpointMilliseconds <= 0)
{
Console.WriteLine("Recovering store for fast restart");
sw.Start();
try
{
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/CacheStoreConcurrent/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private static void RandomReadWorkload(int threadid)
var sessions = new ClientSession<CacheKey, CacheValue, CacheValue, CacheValue, CacheContext, CacheFunctions>[kNumTables];

for (int ht = 0; ht < kNumTables; ht++)
sessions[ht] = h[ht].NewSession<CacheValue, CacheValue, CacheContext, CacheFunctions>(new CacheFunctions());
sessions[ht] = h[ht].For(new CacheFunctions()).NewSession<CacheFunctions>();

var rnd = new Random(threadid);

Expand Down
60 changes: 60 additions & 0 deletions cs/samples/ReadAddress/Types.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using System.Threading;

namespace ReadAddress
{
public struct Key
{
public long key;

public Key(long first) => key = first;

public override string ToString() => key.ToString();

internal class Comparer : IFasterEqualityComparer<Key>
{
public long GetHashCode64(ref Key key) => Utility.GetHashCode(key.key);

public bool Equals(ref Key k1, ref Key k2) => k1.key == k2.key;
}
}

public struct Value
{
public long value;

public Value(long first) => value = first;

public override string ToString() => value.ToString();
}

public class Context
{
public RecordInfo recordInfo;
public Status status;
}

/// <summary>
/// Callback for FASTER operations
/// </summary>
public class Functions : AdvancedSimpleFunctions<Key, Value, Context>
{
// Return false to force a chain of values.
public override bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, long address) => false;

public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, long address) => false;

// Track the recordInfo for its PreviousAddress.
public override void ReadCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status, RecordInfo recordInfo)
{
if (!(ctx is null))
{
ctx.recordInfo = recordInfo;
ctx.status = status;
}
}
}
}
12 changes: 12 additions & 0 deletions cs/samples/ReadAddress/VersionedRead.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>

</Project>
206 changes: 206 additions & 0 deletions cs/samples/ReadAddress/VersionedReadApp.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace ReadAddress
{
class VersionedReadApp
{
// Number of keys in store
const int numKeys = 1000;
const int keyMod = 100;
const int maxLap = numKeys / keyMod;
const int deleteLap = maxLap / 2;

const string ReadCacheArg = "--readcache";
static bool useReadCache = false;
const string CheckpointsArg = "--checkpoints";
static bool useCheckpoints = true;
const string RMWArg = "--rmw";
static bool useRMW = false;

private static void Usage()
{
Console.WriteLine("Reads 'linked lists' of records for each key by backing up the previous-address chain, including showing record versions");
Console.WriteLine("Usage:");
Console.WriteLine($" {ReadCacheArg}: use Read Cache; default = {useReadCache}");
Console.WriteLine($" {CheckpointsArg}: issue periodic checkpoints during load; default = {useCheckpoints}");
Console.WriteLine($" {RMWArg}: issue periodic checkpoints during load; default = {useRMW}");
}

static async Task<int> Main(string[] args)
{
for (var ii = 0; ii < args.Length; ++ii)
{
var arg = args[ii];
if (arg.ToLower() == ReadCacheArg)
{
useReadCache = true;
continue;
}
if (arg.ToLower() == CheckpointsArg)
{
useCheckpoints = true;
continue;
}
if (arg.ToLower() == RMWArg)
{
useRMW = true;
continue;
}
Console.WriteLine($"Unknown option: {arg}");
Usage();
return -1;
}

var (store, log, path) = CreateStore();
await PopulateStore(store);

const int keyToScan = 42;
ScanStore(store, keyToScan);
var cts = new CancellationTokenSource();
await ScanStoreAsync(store, keyToScan, cts.Token);

// Clean up
store.Dispose();
log.Dispose();

// Delete the created files
try { new DirectoryInfo(path).Delete(true); } catch { }

Console.WriteLine("Press <ENTER> to end");
Console.ReadLine();
return 0;
}

private static (FasterKV<Key, Value>, IDevice, string) CreateStore()
{
var path = Path.GetTempPath() + "FasterReadAddressSample\\";
var log = Devices.CreateLogDevice(path + "hlog.log");

var logSettings = new LogSettings
{
LogDevice = log,
ObjectLogDevice = new NullDevice(),
ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null,
// Use small-footprint values
PageSizeBits = 12, // (4K pages)
MemorySizeBits = 20 // (1M memory for main log)
};

var store = new FasterKV<Key, Value>(
size: 1L << 20,
logSettings: logSettings,
checkpointSettings: new CheckpointSettings { CheckpointDir = path },
serializerSettings: null,
comparer: new Key.Comparer()
);
return (store, log, path);
}

private async static Task PopulateStore(FasterKV<Key, Value> store)
{
// Start session with FASTER
using var s = store.For(new Functions()).NewSession<Functions>();
Console.WriteLine($"Writing {numKeys} keys to FASTER", numKeys);

Stopwatch sw = new Stopwatch();
sw.Start();
var prevLap = 0;
for (int ii = 0; ii < numKeys; ii++)
{
// lap is used to illustrate the changing values
var lap = ii / keyMod;

if (useCheckpoints && lap != prevLap)
{
await store.TakeFullCheckpointAsync(CheckpointType.FoldOver);
prevLap = lap;
}

var key = new Key(ii % keyMod);

var value = new Value(key.key + (lap * numKeys * 100));
if (useRMW)
s.RMW(ref key, ref value, serialNo: lap);
else
s.Upsert(ref key, ref value, serialNo: lap);

// Illustrate that deleted records can be shown as well (unless overwritten by in-place operations, which are not done here)
if (lap == deleteLap)
s.Delete(ref key, serialNo: lap);
}
sw.Stop();
double numSec = sw.ElapsedMilliseconds / 1000.0;
Console.WriteLine("Total time to upsert {0} elements: {1:0.000} secs ({2:0.00} inserts/sec)", numKeys, numSec, numKeys / numSec);
}

private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
{
// Start session with FASTER
using var session = store.For(new Functions()).NewSession<Functions>();

Console.WriteLine($"Sync scanning records for key {keyValue}");

var output = default(Value);
var input = default(Value);
var key = new Key(keyValue);
RecordInfo recordInfo = default;
var context = new Context();
int version = int.MaxValue;
for (int lap = 9; /* tested in loop */; --lap)
{
var status = session.Read(ref key, ref input, ref output, ref recordInfo, userContext: context, serialNo: maxLap + 1);
if (status == Status.PENDING)
{
// This will spin CPU for each retrieved record; not recommended for performance-critical code or when retrieving chains for multiple records.
session.CompletePending(spinWait: true);
recordInfo = context.recordInfo;
status = context.status;
}
if (!ProcessRecord(store, status, recordInfo, lap, ref output, ref version))
break;
}
}

private static async Task ScanStoreAsync(FasterKV<Key, Value> store, int keyValue, CancellationToken cancellationToken)
{
// Start session with FASTER
using var session = store.For(new Functions()).NewSession<Functions>();

Console.WriteLine($"Async scanning records for key {keyValue}");

var input = default(Value);
var key = new Key(keyValue);
RecordInfo recordInfo = default;
int version = int.MaxValue;
for (int lap = 9; /* tested in loop */; --lap)
{
var readAsyncResult = await session.ReadAsync(ref key, ref input, recordInfo.PreviousAddress, default, serialNo: maxLap + 1, cancellationToken: cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
var (status, output) = readAsyncResult.Complete(out recordInfo);
if (!ProcessRecord(store, status, recordInfo, lap, ref output, ref version))
break;
}
}

private static bool ProcessRecord(FasterKV<Key, Value> store, Status status, RecordInfo recordInfo, int lap, ref Value output, ref int previousVersion)
{
Debug.Assert((status == Status.NOTFOUND) == recordInfo.Tombstone);
Debug.Assert((lap == deleteLap) == recordInfo.Tombstone);
var value = recordInfo.Tombstone ? "<deleted>" : output.value.ToString();
Debug.Assert(previousVersion >= recordInfo.Version);
Console.WriteLine($" {value}; Version = {recordInfo.Version}; PrevAddress: {recordInfo.PreviousAddress}");

// Check for end of loop
previousVersion = recordInfo.Version;
return recordInfo.PreviousAddress >= store.Log.BeginAddress;
}
}
}
2 changes: 1 addition & 1 deletion cs/samples/StoreDiskReadBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ static async Task AsyncReadOperator(int id)
Input input = default;
int i = 0;

var tasks = new (long, ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Empty, MyFuncs>>)[readBatchSize];
var tasks = new (long, ValueTask<FasterKV<Key, Value>.ReadAsyncResult<Input, Output, Empty>>)[readBatchSize];
while (true)
{
key = new Key(NumKeys * id + rand.Next(0, NumKeys));
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,8 @@ private void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, object cont
// We have the complete record.
if (RetrievedFullRecord(record, ref ctx))
{
if (comparer.Equals(ref ctx.request_key.Get(), ref GetContextRecordKey(ref ctx)))
// ReadAtAddress does not have a request key, so it is an implicit match.
if (ctx.request_key is null || comparer.Equals(ref ctx.request_key.Get(), ref GetContextRecordKey(ref ctx)))
{
// The keys are same, so I/O is complete
// ctx.record = result.record;
Expand Down
Loading

0 comments on commit 5281504

Please sign in to comment.