Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Lockable Unsafe Context (LUC) in FasterKV v2 (WIP) #605

Merged
merged 37 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0eb6b90
WIP on ManualFasterOperations
TedHartMS Nov 12, 2021
865d7b3
Merge branch 'v2' into unsafeclientsession
TedHartMS Nov 12, 2021
0d152dc
fixes for v2 merge
TedHartMS Nov 12, 2021
48e1063
Updates to locking (still in progress)
TedHartMS Nov 29, 2021
1da64b0
merge v2
TedHartMS Nov 29, 2021
bc91d2b
Locking docs
TedHartMS Dec 1, 2021
318c747
Merge remote-tracking branch 'origin/v2' into unsafeclientsession
TedHartMS Dec 1, 2021
6c57519
Merge remote-tracking branch 'origin/v2' into unsafeclientsession
TedHartMS Dec 1, 2021
29fc3c8
Merge remote-tracking branch 'origin/v2' into unsafeclientsession
TedHartMS Dec 2, 2021
bce809e
Update 30-fasterkv-manual-locking.md
badrishc Dec 6, 2021
9481fe7
Merge branch 'unsafeclientsession' of https://github.com/microsoft/FA…
TedHartMS Dec 8, 2021
d3ca140
Updated doc to GH review and discussions
TedHartMS Dec 8, 2021
7d0a5fc
WIP for LockableRawContext
TedHartMS Dec 17, 2021
29542aa
Existing tests run
TedHartMS Dec 18, 2021
2fb97a3
Rename to LockableRawContext, IFasterContext
TedHartMS Dec 18, 2021
c18a6ca
merge branch 'v2' into unsafeclientsession
TedHartMS Dec 20, 2021
56faae1
Perf workaround for LockTable.IsActive
TedHartMS Dec 21, 2021
805ae15
Merge remote-tracking branch 'origin/v2' into unsafeclientsession
TedHartMS Dec 21, 2021
4f0bec6
Merge remote-tracking branch 'origin/v2' into unsafeclientsession
TedHartMS Dec 22, 2021
7fb385f
WIP on ReadCacheEvict and MemoryPageLockEvictionScan
TedHartMS Dec 23, 2021
a6b1f3f
Add ReadCache and LockTable-transfer tests; move ReadCache tests out …
TedHartMS Dec 28, 2021
34a8866
Test MemoryPageLockEvictionScan
TedHartMS Dec 28, 2021
b5283fa
Add testing for two-phase upsert/copytotail
TedHartMS Jan 3, 2022
bf64ffb
Remove manual locking bits in Recovery; minor refactorings in Recover…
TedHartMS Jan 7, 2022
50b17ef
Add manual locking to FASTER.benchmark
TedHartMS Jan 8, 2022
72a87f7
More LockTable testing and fixes, mostly around ocking nonexistent ke…
TedHartMS Jan 14, 2022
763a0ac
Add UnsafeContext and make it the default for FASTER.Benchmark
TedHartMS Jan 15, 2022
875a615
Merge remote-tracking branch 'origin/v2' into unsafeclientsession
TedHartMS Jan 16, 2022
204f258
Updates from code review:
TedHartMS Jan 17, 2022
5cba23b
Rename Take(Full|Index|HybridLog)Checkpoint to TryInitiate(Full|Index…
TedHartMS Jan 18, 2022
963bc57
- Remove SupportsPostOperations; this is now always done
TedHartMS Jan 18, 2022
a62e727
Add CopyWriter, to distinguish "maintentance" copying (CopyToTail or …
TedHartMS Jan 18, 2022
835568f
Merge branch 'unsafeclientsession' of https://github.com/microsoft/FA…
TedHartMS Jan 18, 2022
8a9d547
merge fixes
TedHartMS Jan 18, 2022
c96e487
Merge remote-tracking branch 'origin/v2' into unsafeclientsession
TedHartMS Jan 18, 2022
10ee16d
Remove obsolete Functions-level locking and postOps specifications
TedHartMS Jan 20, 2022
158928d
Fix Remote build
TedHartMS Jan 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ if (MSVC)
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /Od /RTC1 /MDd")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /O2 /Oi /Gy- /MD")

set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:NOICF")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:NOICF")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")

Expand Down
1 change: 1 addition & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{C60F148B-2
..\docs\_docs\25-fasterkv-recovery.md = ..\docs\_docs\25-fasterkv-recovery.md
..\docs\_docs\26-fasterkv-samples.md = ..\docs\_docs\26-fasterkv-samples.md
..\docs\_docs\29-fasterkv-cpp.md = ..\docs\_docs\29-fasterkv-cpp.md
..\docs\_docs\30-fasterkv-manual-locking.md = ..\docs\_docs\30-fasterkv-manual-locking.md
..\docs\_docs\40-fasterlog-basics.md = ..\docs\_docs\40-fasterlog-basics.md
..\docs\_docs\43-fasterlog-tuning.md = ..\docs\_docs\43-fasterlog-tuning.md
..\docs\_docs\46-fasterlog-samples.md = ..\docs\_docs\46-fasterlog-samples.md
Expand Down
351 changes: 351 additions & 0 deletions cs/benchmark/FasterClientSessionYcsbBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using System;
using System.Diagnostics;
using System.Threading;

namespace FASTER.benchmark
{
internal class FASTER_ClientSessionYcsbBenchmark
{
// Ensure sizes are aligned to chunk sizes
static long InitCount;
static long TxnCount;

readonly TestLoader testLoader;
readonly ManualResetEventSlim waiter = new();
readonly int numaStyle;
readonly int readPercent;
readonly Functions functions;
readonly Input[] input_;

readonly Key[] init_keys_;
readonly Key[] txn_keys_;

readonly IDevice device;
readonly FasterKV<Key, Value> store;

long idx_ = 0;
long total_ops_done = 0;
volatile bool done = false;

internal FASTER_ClientSessionYcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoader)
{
// Affinize main thread to last core on first socket if not used by experiment
var (numGrps, numProcs) = Native32.GetNumGroupsProcsPerGroup();
if ((testLoader.Options.NumaStyle == 0 && testLoader.Options.ThreadCount <= (numProcs - 1)) ||
(testLoader.Options.NumaStyle == 1 && testLoader.Options.ThreadCount <= numGrps * (numProcs - 1)))
Native32.AffinitizeThreadRoundRobin(numProcs - 1);

this.testLoader = testLoader;
init_keys_ = i_keys_;
txn_keys_ = t_keys_;
numaStyle = testLoader.Options.NumaStyle;
readPercent = testLoader.Options.ReadPercent;
functions = new Functions();

input_ = new Input[8];
for (int i = 0; i < 8; i++)
input_[i].value = i;

device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.RecoverMode, useIoCompletionPort: true);

if (testLoader.Options.ThreadCount >= 16)
device.ThrottleLimit = testLoader.Options.ThreadCount * 12;

if (testLoader.Options.UseSmallMemoryLog)
store = new FasterKV<Key, Value>
(testLoader.MaxKey / 4, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 25, SegmentSizeBits = 30, MemorySizeBits = 28 },
new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, supportsLocking: testLoader.LockImpl == LockImpl.Ephemeral);
else
store = new FasterKV<Key, Value>
(testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true },
new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, supportsLocking: testLoader.LockImpl == LockImpl.Ephemeral);
}

internal void Dispose()
{
store.Dispose();
device.Dispose();
}

private void RunYcsb(int thread_idx)
{
RandomGenerator rng = new((uint)(1 + thread_idx));

if (numaStyle == 0)
Native32.AffinitizeThreadRoundRobin((uint)thread_idx);
else
Native32.AffinitizeThreadShardedNuma((uint)thread_idx, 2); // assuming two NUMA sockets

waiter.Wait();

var sw = Stopwatch.StartNew();

Value value = default;
Input input = default;
Output output = default;

long reads_done = 0;
long writes_done = 0;

var session = store.For(functions).NewSession<Functions>(null, !testLoader.Options.NoThreadAffinity);

while (!done)
{
long chunk_idx = Interlocked.Add(ref idx_, YcsbConstants.kChunkSize) - YcsbConstants.kChunkSize;
while (chunk_idx >= TxnCount)
{
if (chunk_idx == TxnCount)
idx_ = 0;
chunk_idx = Interlocked.Add(ref idx_, YcsbConstants.kChunkSize) - YcsbConstants.kChunkSize;
}

for (long idx = chunk_idx; idx < chunk_idx + YcsbConstants.kChunkSize && !done; ++idx)
{
Op op;
int r = (int)rng.Generate(100);
if (r < readPercent)
op = Op.Read;
else if (readPercent >= 0)
op = Op.Upsert;
else
op = Op.ReadModifyWrite;

if (idx % 512 == 0)
{
if (!testLoader.Options.NoThreadAffinity)
session.Refresh();
session.CompletePending(false);
}

switch (op)
{
case Op.Upsert:
{
session.Upsert(ref txn_keys_[idx], ref value, Empty.Default, 1);
++writes_done;
break;
}
case Op.Read:
{
session.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default, 1);
++reads_done;
break;
}
case Op.ReadModifyWrite:
{
session.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default, 1);
++writes_done;
break;
}
default:
throw new InvalidOperationException("Unexpected op: " + op);
}
}
}

session.CompletePending(true);
session.Dispose();

sw.Stop();

Console.WriteLine("Thread " + thread_idx + " done; " + reads_done + " reads, " +
writes_done + " writes, in " + sw.ElapsedMilliseconds + " ms.");
Interlocked.Add(ref total_ops_done, reads_done + writes_done);
}

internal unsafe (double, double) Run(TestLoader testLoader)
{
ClientSession<Key, Value, Input, Output, Empty, Functions> session = default;
LockableUnsafeContext<Key, Value, Input, Output, Empty, Functions> luContext = default;

(Key key, LockType kind) xlock = (new Key { value = long.MaxValue }, LockType.Exclusive);
(Key key, LockType kind) slock = (new Key { value = long.MaxValue - 1 }, LockType.Shared);
if (testLoader.Options.LockImpl == (int)LockImpl.Manual)
{
session = store.For(functions).NewSession<Functions>(null);
luContext = session.GetLockableUnsafeContext();

Console.WriteLine("Taking 2 manual locks");
luContext.Lock(xlock.key, xlock.kind);
luContext.Lock(slock.key, slock.kind);
}

Thread[] workers = new Thread[testLoader.Options.ThreadCount];

Console.WriteLine("Executing setup.");

var storeWasRecovered = testLoader.MaybeRecoverStore(store);
long elapsedMs = 0;
if (!storeWasRecovered)
{
// Setup the store for the YCSB benchmark.
Console.WriteLine("Loading FasterKV from data");
for (int idx = 0; idx < testLoader.Options.ThreadCount; ++idx)
{
int x = idx;
workers[idx] = new Thread(() => SetupYcsb(x));
}

foreach (Thread worker in workers)
{
worker.Start();
}

waiter.Set();
var sw = Stopwatch.StartNew();
foreach (Thread worker in workers)
{
worker.Join();
}
sw.Stop();
elapsedMs = sw.ElapsedMilliseconds;
waiter.Reset();
}
double insertsPerSecond = elapsedMs == 0 ? 0 : ((double)InitCount / elapsedMs) * 1000;
Console.WriteLine(TestStats.GetLoadingTimeLine(insertsPerSecond, elapsedMs));
Console.WriteLine(TestStats.GetAddressesLine(AddressLineNum.Before, store.Log.BeginAddress, store.Log.HeadAddress, store.Log.ReadOnlyAddress, store.Log.TailAddress));

if (!storeWasRecovered)
testLoader.MaybeCheckpointStore(store);

// Uncomment below to dispose log from memory, use for 100% read workloads only
// store.Log.DisposeFromMemory();

idx_ = 0;

if (testLoader.Options.DumpDistribution)
Console.WriteLine(store.DumpDistribution());

// Ensure first fold-over checkpoint is fast
if (testLoader.Options.PeriodicCheckpointMilliseconds > 0 && testLoader.Options.PeriodicCheckpointType == CheckpointType.FoldOver)
store.Log.ShiftReadOnlyAddress(store.Log.TailAddress, true);

Console.WriteLine("Executing experiment.");

// Run the experiment.
for (int idx = 0; idx < testLoader.Options.ThreadCount; ++idx)
{
int x = idx;
workers[idx] = new Thread(() => RunYcsb(x));
}
// Start threads.
foreach (Thread worker in workers)
{
worker.Start();
}

waiter.Set();
var swatch = Stopwatch.StartNew();

if (testLoader.Options.PeriodicCheckpointMilliseconds <= 0)
{
Thread.Sleep(TimeSpan.FromSeconds(testLoader.Options.RunSeconds));
}
else
{
var checkpointTaken = 0;
while (swatch.ElapsedMilliseconds < 1000 * testLoader.Options.RunSeconds)
{
if (checkpointTaken < swatch.ElapsedMilliseconds / testLoader.Options.PeriodicCheckpointMilliseconds)
{
long start = swatch.ElapsedTicks;
if (store.TryInitiateHybridLogCheckpoint(out _, testLoader.Options.PeriodicCheckpointType, testLoader.Options.PeriodicCheckpointTryIncremental))
{
store.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();
var timeTaken = (swatch.ElapsedTicks - start) / TimeSpan.TicksPerMillisecond;
Console.WriteLine("Checkpoint time: {0}ms", timeTaken);
checkpointTaken++;
}
}
}
Console.WriteLine($"Checkpoint taken {checkpointTaken}");
}

swatch.Stop();

done = true;

foreach (Thread worker in workers)
{
worker.Join();
}

if (testLoader.Options.LockImpl == (int)LockImpl.Manual)
{
luContext.Unlock(xlock.key, xlock.kind);
luContext.Unlock(slock.key, slock.kind);
luContext.Dispose();
session.Dispose();
}

waiter.Reset();

double seconds = swatch.ElapsedMilliseconds / 1000.0;
Console.WriteLine(TestStats.GetAddressesLine(AddressLineNum.After, store.Log.BeginAddress, store.Log.HeadAddress, store.Log.ReadOnlyAddress, store.Log.TailAddress));

double opsPerSecond = total_ops_done / seconds;
Console.WriteLine(TestStats.GetTotalOpsString(total_ops_done, seconds));
Console.WriteLine(TestStats.GetStatsLine(StatsLineNum.Iteration, YcsbConstants.OpsPerSec, opsPerSecond));
return (insertsPerSecond, opsPerSecond);
}

private void SetupYcsb(int thread_idx)
{
if (numaStyle == 0)
Native32.AffinitizeThreadRoundRobin((uint)thread_idx);
else
Native32.AffinitizeThreadShardedNuma((uint)thread_idx, 2); // assuming two NUMA sockets

waiter.Wait();

var session = store.For(functions).NewSession<Functions>(null, !testLoader.Options.NoThreadAffinity);

Value value = default;

for (long chunk_idx = Interlocked.Add(ref idx_, YcsbConstants.kChunkSize) - YcsbConstants.kChunkSize;
chunk_idx < InitCount;
chunk_idx = Interlocked.Add(ref idx_, YcsbConstants.kChunkSize) - YcsbConstants.kChunkSize)
{
for (long idx = chunk_idx; idx < chunk_idx + YcsbConstants.kChunkSize; ++idx)
{
if (idx % 256 == 0)
{
session.Refresh();

if (idx % 65536 == 0)
{
session.CompletePending(false);
}
}

session.Upsert(ref init_keys_[idx], ref value, Empty.Default, 1);
}
}

session.CompletePending(true);
session.Dispose();
}

#region Load Data

internal static void CreateKeyVectors(TestLoader testLoader, out Key[] i_keys, out Key[] t_keys)
{
InitCount = YcsbConstants.kChunkSize * (testLoader.InitCount / YcsbConstants.kChunkSize);
TxnCount = YcsbConstants.kChunkSize * (testLoader.TxnCount / YcsbConstants.kChunkSize);

i_keys = new Key[InitCount];
t_keys = new Key[TxnCount];
}

internal class KeySetter : IKeySetter<Key>
{
public void Set(Key[] vector, long idx, long value) => vector[idx].value = value;
}

#endregion
}
}
Loading