Skip to content

Commit

Permalink
[C#] Fixes and refactoring of FASTERAsync (#480)
Browse files Browse the repository at this point in the history
* Fix to move await of flushtask to Slow*Async

* Fix setting of RecordInfo in ReadAsync

* FASTERAsync: do not use CompleteAsync.GetAwaiter() in Complete()

* fix asyncOp and asyncOperation usage

* Add XML comment for throttleLimit param

* Change flushTask to a SemaphoreSlim to avoid GetAwaiter().GetResult() in the sync case

* Add AsyncPool.Get() for sync paths and change MLSD to use it instead of GetAwaiter().GetResult()

* Add ConfigureAwait(false) to all awaits

* Break FASTERAsync out to separate files

* Prep for RMW refactor

* Rename UpdelAsync -> UpdateAsync

* Refactor RMWAsync to use UpdateAsync

* Add GetAwaiter().GetResult in one void-returning function where it can't be avoided,

* Update AsyncStress to include options for rmw-only, upsert-only, large vs. small memory log, and whether to use Value, Reference, or Span data types

* Remove obsolete RecordAccessor.cs

* Make IHeapContainer implement IDisposable

* Fix the TakeFullCheckpoint overload without CheckpointType to call through to the overload with it

* Update session.XxxAsync() doc comments for clarity and to emphasize the need to complete operations to avoid leaks

* add raw stddev %s to output

* Add -NoLock option to compare_runs

* Add comments to IUpdateAsyncOperation; fix comment on ccompare_runs

Co-authored-by: TedHartMS <15467143+TedHartMS@users.noreply.github.com>
  • Loading branch information
badrishc and TedHartMS authored May 23, 2021
1 parent 1b3100d commit 707636d
Show file tree
Hide file tree
Showing 46 changed files with 1,758 additions and 1,405 deletions.
15 changes: 14 additions & 1 deletion cs/benchmark/scripts/compare_runs.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@
.PARAMETER RunSeconds
The directory containing the results of the new run, with the changes to be tested for impact; the result comparison is "NewDir throughput minus OldDir throughput".
.PARAMETER NoLock
Do not include results with locking
.EXAMPLE
./compare_runs.ps1 './baseline' './refactor_FASTERImpl'
.EXAMPLE
./compare_runs.ps1 './baseline' './refactor_FASTERImpl' -NoLock
#>
param (
[Parameter(Mandatory)] [String]$OldDir,
[Parameter(Mandatory)] [String]$NewDir
[Parameter(Mandatory)] [String]$NewDir,
[Parameter(Mandatory=$false)] [switch]$NoLock
)

class Result : System.IComparable, System.IEquatable[Object] {
Expand Down Expand Up @@ -232,12 +239,18 @@ $LoadResults.Sort()
$RunResults.Sort()

function RenameProperties([System.Object[]]$results) {
if ($NoLock) {
$results = $results | Where-Object {$_.LockMode -eq 0}
}

# Use this to rename "Percent" suffix to "%"
$results | Select-Object `
BaselineMean,
BaselineStdDev,
@{N='BStDev %';E={[System.Math]::Round(($_.BaselineStdDev / $_.BaselineMean) * 100, 1)}},
CurrentMean,
CurrentStdDev,
@{N='CStDev %';E={[System.Math]::Round(($_.CurrentStdDev / $_.CurrentMean) * 100, 1)}},
MeanDiff,
@{N='MeanDiff %';E={$_.MeanDiffPercent}},
StdDevDiff,
Expand Down
85 changes: 61 additions & 24 deletions cs/playground/AsyncStress/FasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,36 @@ public class FasterWrapper<Key, Value> : IFasterWrapper<Key, Value>
readonly FasterKV<Key, Value> _store;
readonly AsyncPool<ClientSession<Key, Value, Value, Value, Empty, SimpleFunctions<Key, Value, Empty>>> _sessionPool;
readonly bool useOsReadBuffering;
int upsertPendingCount = 0;
int pendingCount = 0;

// This can be used to verify the same amount data is loaded.
public long TailAddress => _store.Log.TailAddress;

// Indicates how many upsert operations went pending
public int UpsertPendingCount { get => upsertPendingCount; set => upsertPendingCount = value; }
public int PendingCount { get => pendingCount; set => pendingCount = value; }

public void ClearPendingCount() => pendingCount = 0;

// Whether OS Read buffering is enabled
public bool UseOsReadBuffering => useOsReadBuffering;

public FasterWrapper(bool useOsReadBuffering = false)
public FasterWrapper(bool isRefType, bool useLargeLog, bool useOsReadBuffering = false)
{
var logDirectory = "d:/AsyncStress";
var logFileName = Guid.NewGuid().ToString();
var logSettings = new LogSettings
{
LogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering),
ObjectLogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.obj.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering),
PageSizeBits = 12,
MemorySizeBits = 13
ObjectLogDevice = isRefType ? new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.obj.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering) : default
};

Console.WriteLine($" Using {logSettings.LogDevice.GetType()}");
if (!useLargeLog)
{
logSettings.PageSizeBits = 12;
logSettings.MemorySizeBits = 13;
}

Console.WriteLine($" FasterWrapper using {logSettings.LogDevice.GetType()}, {(useLargeLog ? "large" : "small")} memory log, and {(isRefType ? string.Empty : "no ")}object log");

this.useOsReadBuffering = useOsReadBuffering;
_store = new FasterKV<Key, Value>(1L << 20, logSettings);
Expand All @@ -46,16 +53,27 @@ public FasterWrapper(bool useOsReadBuffering = false)
() => _store.For(new SimpleFunctions<Key, Value, Empty>()).NewSession<SimpleFunctions<Key, Value, Empty>>());
}

async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.UpsertAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}

async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.RmwAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}

public async ValueTask UpsertAsync(Key key, Value value)
{
if (!_sessionPool.TryGet(out var session))
session = await _sessionPool.GetAsync();
var r = await session.UpsertAsync(key, value);
while (r.Status == Status.PENDING)
{
Interlocked.Increment(ref upsertPendingCount);
r = await r.CompleteAsync();
}
session = await _sessionPool.GetAsync().ConfigureAwait(false);
Interlocked.Add(ref pendingCount, await CompleteAsync(await session.UpsertAsync(key, value).ConfigureAwait(false)));
_sessionPool.Return(session);
}

Expand All @@ -69,26 +87,46 @@ public void Upsert(Key key, Value value)
}

public async ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int count)
{
if (!_sessionPool.TryGet(out var session))
session = await _sessionPool.GetAsync().ConfigureAwait(false);

for (var i = 0; i < count; ++i)
Interlocked.Add(ref pendingCount, await CompleteAsync(await session.UpsertAsync(chunk[offset + i].Item1, chunk[offset + i].Item2)));
_sessionPool.Return(session);
}

public async ValueTask RMWAsync(Key key, Value value)
{
if (!_sessionPool.TryGet(out var session))
session = await _sessionPool.GetAsync().ConfigureAwait(false);
Interlocked.Add(ref pendingCount, await CompleteAsync(await session.RMWAsync(key, value).ConfigureAwait(false)));
_sessionPool.Return(session);
}

public void RMW(Key key, Value value)
{
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.RMW(key, value);
Assert.True(status != Status.PENDING);
_sessionPool.Return(session);
}

public async ValueTask RMWChunkAsync((Key, Value)[] chunk, int offset, int count)
{
if (!_sessionPool.TryGet(out var session))
session = await _sessionPool.GetAsync().ConfigureAwait(false);

for (var i = 0; i < count; ++i)
{
var r = await session.UpsertAsync(chunk[offset + i].Item1, chunk[offset + i].Item2);
while (r.Status == Status.PENDING)
{
Interlocked.Increment(ref upsertPendingCount);
r = await r.CompleteAsync();
}
}
Interlocked.Add(ref pendingCount, await CompleteAsync(await session.RMWAsync(chunk[offset + i].Item1, chunk[offset + i].Item2)));
_sessionPool.Return(session);
}

public async ValueTask<(Status, Value)> ReadAsync(Key key)
{
if (!_sessionPool.TryGet(out var session))
session = await _sessionPool.GetAsync();
session = await _sessionPool.GetAsync().ConfigureAwait(false);
var result = (await session.ReadAsync(key).ConfigureAwait(false)).Complete();
_sessionPool.Return(session);
return result;
Expand Down Expand Up @@ -129,7 +167,6 @@ public async ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int co
return result;
}


public void Dispose()
{
_sessionPool.Dispose();
Expand Down
9 changes: 8 additions & 1 deletion cs/playground/AsyncStress/IFasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@ namespace AsyncStress
public interface IFasterWrapper<Key, Value>
{
long TailAddress { get; }
int UpsertPendingCount { get; set; }
int PendingCount { get; set; }
void ClearPendingCount();
bool UseOsReadBuffering { get; }

void Dispose();

ValueTask<(Status, Value)> Read(Key key);
ValueTask<(Status, Value)> ReadAsync(Key key);
ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk, int offset, int count);

void Upsert(Key key, Value value);
ValueTask UpsertAsync(Key key, Value value);
ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int count);

void RMW(Key key, Value value);
ValueTask RMWAsync(Key key, Value value);
ValueTask RMWChunkAsync((Key, Value)[] chunk, int offset, int count);
}
}
Loading

0 comments on commit 707636d

Please sign in to comment.