Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] [WIP] Status return code v2 #638

Merged
merged 13 commits into from
Feb 19, 2022
2 changes: 1 addition & 1 deletion cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
newValue.value = input.value + oldValue.value;
}

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => true;
public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) { }

public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) => true;

Expand Down
12 changes: 6 additions & 6 deletions cs/playground/AsyncStress/FasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public FasterWrapper(bool isRefType, bool useLargeLog, bool useOsReadBuffering =
async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.UpsertAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}

async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.RmwAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -82,7 +82,7 @@ public void Upsert(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.Upsert(key, value);
Assert.True(status != Status.PENDING);
Assert.False(status.Pending);
_sessionPool.Return(session);
}

Expand All @@ -109,7 +109,7 @@ public void RMW(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.RMW(key, value);
Assert.True(status != Status.PENDING);
Assert.False(status.Pending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -137,14 +137,14 @@ public async ValueTask RMWChunkAsync((Key, Value)[] chunk, int offset, int count
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var result = session.Read(key);
if (result.status == Status.PENDING)
if (result.status.Pending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
for (; completedOutputs.Next(); ++count)
{
Assert.Equal(key, completedOutputs.Current.Key);
result = (Status.OK, completedOutputs.Current.Output);
result = (completedOutputs.Current.Status, completedOutputs.Current.Output);
}
completedOutputs.Dispose();
Assert.Equal(1, count);
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/AsyncStress/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async ValueTask doUpdates()
Console.WriteLine(" Verifying read results ...");
Parallel.For(0, numOperations, i =>
{
Assert.Equal(Status.OK, results[i].Item1);
Assert.True(results[i].Item1.Found);
Assert.Equal(database[i].Item2, results[i].Item2);
});
Console.WriteLine(" Results verified");
Expand Down
13 changes: 8 additions & 5 deletions cs/playground/AsyncStress/SerializedFasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void Update<TUpdater, TAsyncResult>(TUpdater updater, Key key, Value valu
}
}

Assert.True(status != Status.PENDING);
Assert.False(status.Pending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -173,10 +173,11 @@ public async ValueTask UpdateChunkAsync<TUpdater, TAsyncResult>(TUpdater updater

var (status, output) = (await task.ConfigureAwait(false)).Complete();
_sessionPool.Return(session);
Assert.True(status.CompletedSuccessfully);

using IMemoryOwner<byte> memoryOwner = output.Memory;

return (status, status != Status.OK ? default : MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory));
return (status, status.Found ? MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory) : default);
}

public ValueTask<(Status, Value)> Read(Key key)
Expand All @@ -197,22 +198,24 @@ public async ValueTask UpdateChunkAsync<TUpdater, TAsyncResult>(TUpdater updater
}
}

if (result.Item1 == Status.PENDING)
if (result.Item1.Pending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
for (; completedOutputs.Next(); ++count)
{
using IMemoryOwner<byte> memoryOwner = completedOutputs.Current.Output.Memory;
userResult = (completedOutputs.Current.Status, completedOutputs.Current.Status != Status.OK ? default : MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory));
Assert.True(completedOutputs.Current.Status.CompletedSuccessfully);
userResult = (completedOutputs.Current.Status, completedOutputs.Current.Status.Found ? MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory) : default);
}
completedOutputs.Dispose();
Assert.Equal(1, count);
}
else
{
using IMemoryOwner<byte> memoryOwner = result.Item2.Memory;
userResult = (result.Item1, result.Item1 != Status.OK ? default : MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory));
Assert.True(result.Item1.CompletedSuccessfully);
userResult = (result.Item1, result.Item1.Found ? MessagePackSerializer.Deserialize<Value>(memoryOwner.Memory) : default);
}
_sessionPool.Return(session);
return new ValueTask<(Status, Value)>(userResult);
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/AsyncStress/SerializedUpdaters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanBy
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -43,7 +43,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteA
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status == Status.PENDING; ++numPending)
for (; result.Status.Pending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand Down
19 changes: 9 additions & 10 deletions cs/playground/CacheStoreConcurrent/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,17 @@ private static void RandomReadWorkload(int threadid)
var key = new CacheKey(k);
var status = hts.Read(ref key, ref output);

switch (status)
if (status.Pending)
{
case Status.PENDING:
statusPending++;
break;
case Status.OK:
if (output.value != key.key)
throw new Exception("Read error!");
break;
default:
throw new Exception("Error!");
statusPending++;
}
else if (status.Found)
{
if (output.value != key.key)
throw new Exception("Read error!");
}
else
throw new Exception("Error!");
i++;
}
/*
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/CacheStoreConcurrent/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public override void ReadCompletionCallback(ref CacheKey key, ref CacheValue inp
{
long ticks = Stopwatch.GetTimestamp() - ctx.ticks;

if (status == Status.NOTFOUND)
if (!status.Found)
Console.WriteLine("Async: Value not found, latency = {0}ms", 1000 * (ticks - ctx.ticks) / (double)Stopwatch.Frequency);

if (output.value != key.key)
Expand Down
57 changes: 28 additions & 29 deletions cs/playground/ClassRecoveryDurability/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using FASTER.core;

namespace ClassRecoveryDurablity
{
class Program
{
static bool stop;
static int deleteWindow = 5;
static int indexAhead = 10000000;
static int startDeleteHeight = 20;
static int addCount = 100;
static int deletePosition = 20;
static readonly int deleteWindow = 5;
static readonly int indexAhead = 10000000;
static readonly int startDeleteHeight = 20;
static readonly int addCount = 100;
static readonly int deletePosition = 20;

static void Main(string[] args)
{
Expand All @@ -39,7 +38,7 @@ static void Main(string[] args)

static void Filldb(int stopAfterIteration = 5)
{
Storedb store = new Storedb(@"C:\FasterTest\data");
Storedb store = new(@"C:\FasterTest\data");

Console.WriteLine("call init db");
var first = store.InitAndRecover();
Expand All @@ -52,7 +51,7 @@ static void Filldb(int stopAfterIteration = 5)
var ss = store.db.For(new Types.StoreFunctions()).NewSession<Types.StoreFunctions>();

lastBlockvalue = new Types.StoreValue { value = BitConverter.GetBytes(0) };
Types.StoreContext context1 = new Types.StoreContext();
Types.StoreContext context1 = new();
ss.Upsert(ref lastblockKey, ref lastBlockvalue, context1, 1);

ss.CompletePending(true);
Expand All @@ -70,10 +69,10 @@ static void Filldb(int stopAfterIteration = 5)

while (stop == false)
{
Types.StoreInput input = new Types.StoreInput();
Types.StoreOutput output = new Types.StoreOutput();
Types.StoreInput input = new();
Types.StoreOutput output = new();
lastblockKey = new Types.StoreKey { tableType = "L", key = new byte[1] { 0 } };
Types.StoreContext context1 = new Types.StoreContext();
Types.StoreContext context1 = new();
var blkStatus = session.Read(ref lastblockKey, ref input, ref output, context1, 1);
var blockHeight = BitConverter.ToUInt32(output.value.value);
blockHeight += 1;
Expand All @@ -86,12 +85,12 @@ static void Filldb(int stopAfterIteration = 5)

var upsertKey = new Types.StoreKey { tableType = "C", key = data.key };
var upsertValue = new Types.StoreValue { value = data.data };
Types.StoreContext context2 = new Types.StoreContext();
Types.StoreContext context2 = new();
var addStatus = session.Upsert(ref upsertKey, ref upsertValue, context2, 1);

// Console.WriteLine("add=" + i);

if (addStatus != Status.OK)
if (!addStatus.InPlaceUpdatedRecord)
throw new Exception();
}

Expand All @@ -106,11 +105,11 @@ static void Filldb(int stopAfterIteration = 5)
var data = Generate(i);

var deteletKey = new Types.StoreKey { tableType = "C", key = data.key };
Types.StoreContext context2 = new Types.StoreContext();
Types.StoreContext context2 = new();
var deleteStatus = session.Delete(ref deteletKey, context2, 1);
// Console.WriteLine("delete=" + i);

if (deleteStatus != Status.OK)
if (!deleteStatus.InPlaceUpdatedRecord)
throw new Exception();
}
}
Expand All @@ -120,7 +119,7 @@ static void Filldb(int stopAfterIteration = 5)

lastBlockvalue = new Types.StoreValue { value = BitConverter.GetBytes(blockHeight) };
lastblockKey = new Types.StoreKey { tableType = "L", key = new byte[1] { 0 } };
Types.StoreContext context = new Types.StoreContext();
Types.StoreContext context = new();
session.Upsert(ref lastblockKey, ref lastBlockvalue, context, 1);
session.CompletePending(true);

Expand Down Expand Up @@ -164,10 +163,10 @@ static void TestData(Storedb store, int count)
var lastBlockvalue = new Types.StoreValue();

// test all data up to now.
Types.StoreInput input = new Types.StoreInput();
Types.StoreOutput output = new Types.StoreOutput();
Types.StoreInput input = new();
Types.StoreOutput output = new();
lastblockKey = new Types.StoreKey { tableType = "L", key = new byte[1] { 0 } };
Types.StoreContext context1 = new Types.StoreContext();
Types.StoreContext context1 = new();
var blkStatus = session.Read(ref lastblockKey, ref input, ref output, context1, 1);
var blockHeight = BitConverter.ToUInt32(output.value.value);

Expand All @@ -188,41 +187,41 @@ static void TestData(Storedb store, int count)
{
var data = Generate(j);

Types.StoreInput input1 = new Types.StoreInput();
Types.StoreOutput output1 = new Types.StoreOutput();
Types.StoreContext context = new Types.StoreContext();
Types.StoreInput input1 = new();
Types.StoreOutput output1 = new();
Types.StoreContext context = new();
var readKey = new Types.StoreKey { tableType = "C", key = data.key };

var deleteStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test delete=" + i);

if (deleteStatus == Status.PENDING)
if (deleteStatus.Pending)
{
session.CompletePending(true);
context.FinalizeRead(ref deleteStatus, ref output1);
}

if (deleteStatus != Status.NOTFOUND)
if (deleteStatus.Found)
throw new Exception();
}
else
{
var data = Generate(i);

Types.StoreInput input1 = new Types.StoreInput();
Types.StoreOutput output1 = new Types.StoreOutput();
Types.StoreContext context = new Types.StoreContext();
Types.StoreInput input1 = new();
Types.StoreOutput output1 = new();
Types.StoreContext context = new();
var readKey = new Types.StoreKey { tableType = "C", key = data.key };
var addStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test add=" + i);

if (addStatus == Status.PENDING)
if (addStatus.Pending)
{
session.CompletePending(true);
context.FinalizeRead(ref addStatus, ref output1);
}

if (addStatus != Status.OK)
if (!addStatus.Found)
throw new Exception();

if (output1.value.value.SequenceEqual(data.data) == false)
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/SumStore/ConcurrencyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void PopulateWorker(int threadId)
{
var status = session.RMW(ref inputArray[i].adId, ref inputArray[i], Empty.Default, i);

if (status != Status.OK && status != Status.NOTFOUND)
if (!status.CompletedSuccessfully)
throw new Exception();

if (i % completePendingInterval == 0)
Expand Down Expand Up @@ -156,7 +156,7 @@ public void Test()
Input input = default;
Output output = default;
var status = session.Read(ref inputArray[i].adId, ref input, ref output, Empty.Default, i);
if (status == Status.PENDING)
if (status.Pending)
throw new NotImplementedException();

inputArray[i].numClicks = output.value;
Expand Down
Loading