Skip to content

Commit

Permalink
[C#] [WIP] Status return code v2 (#638)
Browse files Browse the repository at this point in the history
* Prototype of status as struct

* improving status a bit

* fix build break, expose underlying (raw) status code to user

* misc

* Update StatusCode

* Update Remote to new Status(Code); fix test cases; fix StatusCode layout; fine-tune Status(Code) names; make StatusCode internal

* Update StatusCode and Status to final spec

* Update Remote to new Status spec

* unrelated nit

Co-authored-by: TedHartMS <15467143+TedHartMS@users.noreply.github.com>
  • Loading branch information
badrishc and TedHartMS authored Feb 19, 2022
1 parent c63068c commit 5fc8c9d
Show file tree
Hide file tree
Showing 105 changed files with 1,436 additions and 1,156 deletions.
2 changes: 1 addition & 1 deletion cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
newValue.value = input.value + oldValue.value;
}

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => true;
public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) { }

public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) => true;

Expand Down
12 changes: 6 additions & 6 deletions cs/playground/AsyncStress/FasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public FasterWrapper(bool isRefType, bool useLargeLog, bool useOsReadBuffering =
async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.UpsertAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}

async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.RmwAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -82,7 +82,7 @@ public void Upsert(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.Upsert(key, value);
Assert.True(status != Status.PENDING);
Assert.False(status.Pending);
_sessionPool.Return(session);
}

Expand All @@ -109,7 +109,7 @@ public void RMW(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.RMW(key, value);
Assert.True(status != Status.PENDING);
Assert.False(status.Pending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -137,14 +137,14 @@ public async ValueTask RMWChunkAsync((Key, Value)[] chunk, int offset, int count
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var result = session.Read(key);
if (result.status == Status.PENDING)
if (result.status.Pending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
for (; completedOutputs.Next(); ++count)
{
Assert.Equal(key, completedOutputs.Current.Key);
result = (Status.OK, completedOutputs.Current.Output);
result = (completedOutputs.Current.Status, completedOutputs.Current.Output);
}
completedOutputs.Dispose();
Assert.Equal(1, count);
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/AsyncStress/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async ValueTask doUpdates()
Console.WriteLine(" Verifying read results ...");
Parallel.For(0, numOperations, i =>
{
Assert.Equal(Status.OK, results[i].Item1);
Assert.True(results[i].Item1.Found);
Assert.Equal(database[i].Item2, results[i].Item2);
});
Console.WriteLine(" Results verified");
Expand Down
13 changes: 8 additions & 5 deletions cs/playground/AsyncStress/SerializedFasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void Update<TUpdater, TAsyncResult>(TUpdater updater, Key key, Value valu
}
}

Assert.True(status != Status.PENDING);
Assert.False(status.Pending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -173,10 +173,11 @@ public async ValueTask UpdateChunkAsync<TUpdater, TAsyncResult>(TUpdater updater

var (status, output) = (await task.ConfigureAwait(false)).Complete();
_sessionPool.Return(session);
Assert.True(status.CompletedSuccessfully);

using IMemoryOwner<byte> memoryOwner = output.Memory;

return (status, status != Status.OK ? default : MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory));
return (status, status.Found ? MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory) : default);
}

public ValueTask<(Status, Value)> Read(Key key)
Expand All @@ -197,22 +198,24 @@ public async ValueTask UpdateChunkAsync<TUpdater, TAsyncResult>(TUpdater updater
}
}

if (result.Item1 == Status.PENDING)
if (result.Item1.Pending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
for (; completedOutputs.Next(); ++count)
{
using IMemoryOwner<byte> memoryOwner = completedOutputs.Current.Output.Memory;
userResult = (completedOutputs.Current.Status, completedOutputs.Current.Status != Status.OK ? default : MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory));
Assert.True(completedOutputs.Current.Status.CompletedSuccessfully);
userResult = (completedOutputs.Current.Status, completedOutputs.Current.Status.Found ? MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory) : default);
}
completedOutputs.Dispose();
Assert.Equal(1, count);
}
else
{
using IMemoryOwner<byte> memoryOwner = result.Item2.Memory;
userResult = (result.Item1, result.Item1 != Status.OK ? default : MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory));
Assert.True(result.Item1.CompletedSuccessfully);
userResult = (result.Item1, result.Item1.Found ? MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory) : default);
}
_sessionPool.Return(session);
return new ValueTask<(Status, Value)>(userResult);
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/AsyncStress/SerializedUpdaters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanBy
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -43,7 +43,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteA
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand Down
19 changes: 9 additions & 10 deletions cs/playground/CacheStoreConcurrent/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,17 @@ private static void RandomReadWorkload(int threadid)
var key = new CacheKey(k);
var status = hts.Read(ref key, ref output);

switch (status)
if (status.Pending)
{
case Status.PENDING:
statusPending++;
break;
case Status.OK:
if (output.value != key.key)
throw new Exception("Read error!");
break;
default:
throw new Exception("Error!");
statusPending++;
}
else if (status.Found)
{
if (output.value != key.key)
throw new Exception("Read error!");
}
else
throw new Exception("Error!");
i++;
}
/*
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/CacheStoreConcurrent/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public override void ReadCompletionCallback(ref CacheKey key, ref CacheValue inp
{
long ticks = Stopwatch.GetTimestamp() - ctx.ticks;

if (status == Status.NOTFOUND)
if (!status.Found)
Console.WriteLine("Async: Value not found, latency = {0}ms", 1000 * (ticks - ctx.ticks) / (double)Stopwatch.Frequency);

if (output.value != key.key)
Expand Down
57 changes: 28 additions & 29 deletions cs/playground/ClassRecoveryDurability/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using FASTER.core;

namespace ClassRecoveryDurablity
{
class Program
{
static bool stop;
static int deleteWindow = 5;
static int indexAhead = 10000000;
static int startDeleteHeight = 20;
static int addCount = 100;
static int deletePosition = 20;
static readonly int deleteWindow = 5;
static readonly int indexAhead = 10000000;
static readonly int startDeleteHeight = 20;
static readonly int addCount = 100;
static readonly int deletePosition = 20;

static void Main(string[] args)
{
Expand All @@ -39,7 +38,7 @@ static void Main(string[] args)

static void Filldb(int stopAfterIteration = 5)
{
Storedb store = new Storedb(@"C:\FasterTest\data");
Storedb store = new(@"C:\FasterTest\data");

Console.WriteLine("call init db");
var first = store.InitAndRecover();
Expand All @@ -52,7 +51,7 @@ static void Filldb(int stopAfterIteration = 5)
var ss = store.db.For(new Types.StoreFunctions()).NewSession<Types.StoreFunctions>();

lastBlockvalue = new Types.StoreValue { value = BitConverter.GetBytes(0) };
Types.StoreContext context1 = new Types.StoreContext();
Types.StoreContext context1 = new();
ss.Upsert(ref lastblockKey, ref lastBlockvalue, context1, 1);

ss.CompletePending(true);
Expand All @@ -70,10 +69,10 @@ static void Filldb(int stopAfterIteration = 5)

while (stop == false)
{
Types.StoreInput input = new Types.StoreInput();
Types.StoreOutput output = new Types.StoreOutput();
Types.StoreInput input = new();
Types.StoreOutput output = new();
lastblockKey = new Types.StoreKey { tableType = "L", key = new byte[1] { 0 } };
Types.StoreContext context1 = new Types.StoreContext();
Types.StoreContext context1 = new();
var blkStatus = session.Read(ref lastblockKey, ref input, ref output, context1, 1);
var blockHeight = BitConverter.ToUInt32(output.value.value);
blockHeight += 1;
Expand All @@ -86,12 +85,12 @@ static void Filldb(int stopAfterIteration = 5)

var upsertKey = new Types.StoreKey { tableType = "C", key = data.key };
var upsertValue = new Types.StoreValue { value = data.data };
Types.StoreContext context2 = new Types.StoreContext();
Types.StoreContext context2 = new();
var addStatus = session.Upsert(ref upsertKey, ref upsertValue, context2, 1);

// Console.WriteLine("add=" + i);

if (addStatus != Status.OK)
if (!addStatus.InPlaceUpdatedRecord)
throw new Exception();
}

Expand All @@ -106,11 +105,11 @@ static void Filldb(int stopAfterIteration = 5)
var data = Generate(i);

var deteletKey = new Types.StoreKey { tableType = "C", key = data.key };
Types.StoreContext context2 = new Types.StoreContext();
Types.StoreContext context2 = new();
var deleteStatus = session.Delete(ref deteletKey, context2, 1);
// Console.WriteLine("delete=" + i);

if (deleteStatus != Status.OK)
if (!deleteStatus.InPlaceUpdatedRecord)
throw new Exception();
}
}
Expand All @@ -120,7 +119,7 @@ static void Filldb(int stopAfterIteration = 5)

lastBlockvalue = new Types.StoreValue { value = BitConverter.GetBytes(blockHeight) };
lastblockKey = new Types.StoreKey { tableType = "L", key = new byte[1] { 0 } };
Types.StoreContext context = new Types.StoreContext();
Types.StoreContext context = new();
session.Upsert(ref lastblockKey, ref lastBlockvalue, context, 1);
session.CompletePending(true);

Expand Down Expand Up @@ -164,10 +163,10 @@ static void TestData(Storedb store, int count)
var lastBlockvalue = new Types.StoreValue();

// test all data up to now.
Types.StoreInput input = new Types.StoreInput();
Types.StoreOutput output = new Types.StoreOutput();
Types.StoreInput input = new();
Types.StoreOutput output = new();
lastblockKey = new Types.StoreKey { tableType = "L", key = new byte[1] { 0 } };
Types.StoreContext context1 = new Types.StoreContext();
Types.StoreContext context1 = new();
var blkStatus = session.Read(ref lastblockKey, ref input, ref output, context1, 1);
var blockHeight = BitConverter.ToUInt32(output.value.value);

Expand All @@ -188,41 +187,41 @@ static void TestData(Storedb store, int count)
{
var data = Generate(j);

Types.StoreInput input1 = new Types.StoreInput();
Types.StoreOutput output1 = new Types.StoreOutput();
Types.StoreContext context = new Types.StoreContext();
Types.StoreInput input1 = new();
Types.StoreOutput output1 = new();
Types.StoreContext context = new();
var readKey = new Types.StoreKey { tableType = "C", key = data.key };

var deleteStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test delete=" + i);

if (deleteStatus == Status.PENDING)
if (deleteStatus.Pending)
{
session.CompletePending(true);
context.FinalizeRead(ref deleteStatus, ref output1);
}

if (deleteStatus != Status.NOTFOUND)
if (deleteStatus.Found)
throw new Exception();
}
else
{
var data = Generate(i);

Types.StoreInput input1 = new Types.StoreInput();
Types.StoreOutput output1 = new Types.StoreOutput();
Types.StoreContext context = new Types.StoreContext();
Types.StoreInput input1 = new();
Types.StoreOutput output1 = new();
Types.StoreContext context = new();
var readKey = new Types.StoreKey { tableType = "C", key = data.key };
var addStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test add=" + i);

if (addStatus == Status.PENDING)
if (addStatus.Pending)
{
session.CompletePending(true);
context.FinalizeRead(ref addStatus, ref output1);
}

if (addStatus != Status.OK)
if (!addStatus.Found)
throw new Exception();

if (output1.value.value.SequenceEqual(data.data) == false)
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/SumStore/ConcurrencyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void PopulateWorker(int threadId)
{
var status = session.RMW(ref inputArray[i].adId, ref inputArray[i], Empty.Default, i);

if (status != Status.OK && status != Status.NOTFOUND)
if (!status.CompletedSuccessfully)
throw new Exception();

if (i % completePendingInterval == 0)
Expand Down Expand Up @@ -156,7 +156,7 @@ public void Test()
Input input = default;
Output output = default;
var status = session.Read(ref inputArray[i].adId, ref input, ref output, Empty.Default, i);
if (status == Status.PENDING)
if (status.Pending)
throw new NotImplementedException();

inputArray[i].numClicks = output.value;
Expand Down
Loading

0 comments on commit 5fc8c9d

Please sign in to comment.