Skip to content

Commit

Permalink
[C#] Add Status.Record (#659)
Browse files Browse the repository at this point in the history
* Add Status.Record; update OperationStatus with all Advanced Status codes

* Update Remote for status.Pending => .IsPending
  • Loading branch information
TedHartMS authored Feb 22, 2022
1 parent 69f8eda commit 34a7576
Show file tree
Hide file tree
Showing 69 changed files with 306 additions and 286 deletions.
10 changes: 5 additions & 5 deletions cs/playground/AsyncStress/FasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public FasterWrapper(bool isRefType, bool useLargeLog, bool useOsReadBuffering =
async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.UpsertAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}

async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.RmwAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -82,7 +82,7 @@ public void Upsert(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.Upsert(key, value);
Assert.False(status.Pending);
Assert.False(status.IsPending);
_sessionPool.Return(session);
}

Expand All @@ -109,7 +109,7 @@ public void RMW(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.RMW(key, value);
Assert.False(status.Pending);
Assert.False(status.IsPending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -137,7 +137,7 @@ public async ValueTask RMWChunkAsync((Key, Value)[] chunk, int offset, int count
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var result = session.Read(key);
if (result.status.Pending)
if (result.status.IsPending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/AsyncStress/SerializedFasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void Update<TUpdater, TAsyncResult>(TUpdater updater, Key key, Value valu
}
}

Assert.False(status.Pending);
Assert.False(status.IsPending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -198,7 +198,7 @@ public async ValueTask UpdateChunkAsync<TUpdater, TAsyncResult>(TUpdater updater
}
}

if (result.Item1.Pending)
if (result.Item1.IsPending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/AsyncStress/SerializedUpdaters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanBy
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -43,7 +43,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteA
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/CacheStoreConcurrent/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private static void RandomReadWorkload(int threadid)
var key = new CacheKey(k);
var status = hts.Read(ref key, ref output);

if (status.Pending)
if (status.IsPending)
{
statusPending++;
}
Expand Down
8 changes: 4 additions & 4 deletions cs/playground/ClassRecoveryDurability/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ static void Filldb(int stopAfterIteration = 5)

// Console.WriteLine("add=" + i);

if (!addStatus.InPlaceUpdatedRecord)
if (!addStatus.Record.InPlaceUpdated)
throw new Exception();
}

Expand All @@ -109,7 +109,7 @@ static void Filldb(int stopAfterIteration = 5)
var deleteStatus = session.Delete(ref deteletKey, context2, 1);
// Console.WriteLine("delete=" + i);

if (!deleteStatus.InPlaceUpdatedRecord)
if (!deleteStatus.Record.InPlaceUpdated)
throw new Exception();
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ static void TestData(Storedb store, int count)
var deleteStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test delete=" + i);

if (deleteStatus.Pending)
if (deleteStatus.IsPending)
{
session.CompletePending(true);
context.FinalizeRead(ref deleteStatus, ref output1);
Expand All @@ -215,7 +215,7 @@ static void TestData(Storedb store, int count)
var addStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test add=" + i);

if (addStatus.Pending)
if (addStatus.IsPending)
{
session.CompletePending(true);
context.FinalizeRead(ref addStatus, ref output1);
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/SumStore/ConcurrencyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void Test()
Input input = default;
Output output = default;
var status = session.Read(ref inputArray[i].adId, ref input, ref output, Empty.Default, i);
if (status.Pending)
if (status.IsPending)
throw new NotImplementedException();

inputArray[i].numClicks = output.value;
Expand Down
6 changes: 3 additions & 3 deletions cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private unsafe void ProcessBatch(byte* buf, int offset)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));

if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status.Found)
serializer.SkipOutput(ref dcurr);
Expand All @@ -186,7 +186,7 @@ private unsafe void ProcessBatch(byte* buf, int offset)

hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status.IsCompletedSuccessfully)
serializer.SkipOutput(ref dcurr);
Expand Down Expand Up @@ -273,7 +273,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
if (valPtr == null)
status = session.Read(ref key, ref serializer.ReadInputByRef(ref inputPtr), ref serializer.AsRefOutput(outputDcurr, (int)(dend - dcurr)), ctx, 0);

if (!status.Pending)
if (!status.IsPending)
{
// Write six bytes (message | status | sid)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Expand Down
6 changes: 3 additions & 3 deletions cs/remote/src/FASTER.server/WebsocketServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));

if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status.Found)
serializer.SkipOutput(ref dcurr);
Expand All @@ -403,7 +403,7 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)

hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));

if (subscribeKVBroker != null)
Expand Down Expand Up @@ -589,7 +589,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
if (valPtr == null)
status = session.Read(ref key, ref serializer.ReadInputByRef(ref inputPtr), ref serializer.AsRefOutput(outputDcurr, (int)(dend - dcurr)), ctx, 0);

if (!status.Pending)
if (!status.IsPending)
{
// Write six bytes (message | status | sid)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/AzureBackedStore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static void Main()

var status = s.Read(ref key, ref output, context);

if (!status.Pending)
if (!status.IsPending)
{
if (status.Found && output == context)
Console.WriteLine("Success!");
Expand Down Expand Up @@ -94,7 +94,7 @@ static void Main()

var status = s.Read(ref key, ref output, context);

if (!status.Pending)
if (!status.IsPending)
{
if (status.Found && output == context)
Console.WriteLine("Success!");
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/CacheStore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private static void RandomReadWorkload(FasterKV<CacheKey, CacheValue> store, int
var key = new CacheKey(k);
var status = s.Read(ref key, ref output);

if (status.Pending)
if (status.IsPending)
{
statusPending++;
if (statusPending % 100 == 0)
Expand Down Expand Up @@ -186,7 +186,7 @@ private static void InteractiveReadWorkload(FasterKV<CacheKey, CacheValue> store

context.ticks = Stopwatch.GetTimestamp();
var status = s.Read(ref key, ref output, context);
if (status.Pending)
if (status.IsPending)
{
s.CompletePending(true);
}
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/HelloWorld/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static void DiskSample()
// Reads from disk will return PENDING status, result available via either asynchronous IFunctions callback
// or on this thread via CompletePendingWithOutputs, shown below
status = session.Read(ref key, ref output);
if (status.Pending)
if (status.IsPending)
{
session.CompletePendingWithOutputs(out var iter, true);
while (iter.Next())
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/ReadAddress/VersionedReadApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
var status = session.Read(ref key, ref input, ref output, ref recordMetadata, ReadFlags.SkipCopyReads, serialNo: maxLap + 1);

// This will wait for each retrieved record; not recommended for performance-critical code or when retrieving multiple records unless necessary.
if (status.Pending)
if (status.IsPending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
using (completedOutputs)
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/StoreAsyncApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ static async Task AsyncOperator(int id)
if (asyncUpsert)
{
var r = await session.UpsertAsync(ref key, ref value, context, seqNo++);
while (r.Status.Pending)
while (r.Status.IsPending)
r = await r.CompleteAsync();
}
else
Expand Down Expand Up @@ -130,7 +130,7 @@ static async Task AsyncOperator(int id)
for (int i = 0; i < batchSize; i++)
{
var r = await taskBatch[i];
while (r.Status.Pending)
while (r.Status.IsPending)
r = await r.CompleteAsync();
}
}
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreCustomTypes/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ static void Main()
status = s.Read(ref key, ref input, ref g2, context, 0);

// We will receive the result via ReadCompletionCallback in Functions
if (!status.Pending)
if (!status.IsPending)
Console.WriteLine("Error!");

// End session when done
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/StoreDiskReadBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static async Task AsyncReadOperator(int id)
var result = session.Read(ref key, ref input, ref output, Empty.Default, 0);
if (readBatching)
{
if (!result.Pending)
if (!result.IsPending)
{
if (output.value.vfield1 != key.key)
{
Expand All @@ -166,7 +166,7 @@ static async Task AsyncReadOperator(int id)
}
else
{
if (result.Pending)
if (result.IsPending)
{
session.CompletePending(true);
}
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/AsciiSumSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void Run()
store.Log.FlushAndEvict(true); // Flush and evict all records to disk
var _status = s.RMW(_key, _input); // CopyUpdater to 270 (due to immutable source value on disk)

if (_status.Pending)
if (_status.IsPending)
{
Console.WriteLine("Error!");
return;
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/MemoryByteSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void Run()
var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Span.Fill((byte)valLen);

if (status.Pending)
if (status.IsPending)
s.CompletePending(true);
else
{
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/MemoryIntSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void Run()
var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Span.Fill(valLen);

if (status.Pending)
if (status.IsPending)
s.CompletePending(true);
else
{
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/SpanByteSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void Run()
var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Fill((byte)valLen);

if (status.Pending)
if (status.IsPending)
{
s.CompletePending(true);
}
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Async/DeleteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ internal DeleteAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Va
/// <summary>Complete the Delete operation, issuing additional allocation asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
/// <returns>ValueTask for Delete result. User needs to await again if result status is Status.PENDING.</returns>
public ValueTask<DeleteAsyncResult<Input, Output, Context>> CompleteAsync(CancellationToken token = default)
=> this.Status.Pending
=> this.Status.IsPending
? updateAsyncInternal.CompleteAsync(token)
: new ValueTask<DeleteAsyncResult<Input, Output, Context>>(new DeleteAsyncResult<Input, Output, Context>(this.Status));

/// <summary>Complete the Delete operation, issuing additional I/O synchronously if needed.</summary>
/// <returns>Status of Delete operation</returns>
public Status Complete() => this.Status.Pending ? updateAsyncInternal.Complete().Status : this.Status;
public Status Complete() => this.Status.IsPending ? updateAsyncInternal.Complete().Status : this.Status;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
10 changes: 5 additions & 5 deletions cs/src/core/Async/RMWAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
pendingContext.serialNum, asyncOp, out flushEvent, out newDiskRequest);
output = pendingContext.output;

if (status.Pending && !newDiskRequest.IsDefault())
if (status.IsPending && !newDiskRequest.IsDefault())
{
flushEvent = default;
this.diskRequest = newDiskRequest;
Expand All @@ -56,7 +56,7 @@ public bool CompletePendingIO(IFasterSession<Key, Value, Input, Output, Context>
var status = completedOutputs.Next() ? completedOutputs.Current.Status : new(StatusCode.Error);
completedOutputs.Dispose();
this.diskRequest = default;
return !status.Pending;
return !status.IsPending;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -109,7 +109,7 @@ internal RmwAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value
/// <summary>Complete the RMW operation, issuing additional (rare) I/O asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
/// <returns>ValueTask for RMW result. User needs to await again if result status is pending.</returns>
public ValueTask<RmwAsyncResult<Input, TOutput, Context>> CompleteAsync(CancellationToken token = default)
=> this.Status.Pending
=> this.Status.IsPending
? updateAsyncInternal.CompleteAsync(token)
: new ValueTask<RmwAsyncResult<Input, TOutput, Context>>(new RmwAsyncResult<Input, TOutput, Context>(this.Status, this.Output, this.RecordMetadata));

Expand All @@ -122,7 +122,7 @@ public ValueTask<RmwAsyncResult<Input, TOutput, Context>> CompleteAsync(Cancella
/// <returns>Status of RMW operation</returns>
public (Status status, TOutput output) Complete(out RecordMetadata recordMetadata)
{
if (!this.Status.Pending)
if (!this.Status.IsPending)
{
recordMetadata = this.RecordMetadata;
return (this.Status, this.Output);
Expand Down Expand Up @@ -154,7 +154,7 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu
{
Output output = default;
var status = CallInternalRMW(fasterSession, currentCtx, ref pcontext, ref key, ref input, ref output, context, serialNo, asyncOp: true, out flushEvent, out diskRequest);
if (!status.Pending)
if (!status.IsPending)
return new ValueTask<RmwAsyncResult<Input, Output, Context>>(new RmwAsyncResult<Input, Output, Context>(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
}
finally
Expand Down
Loading

0 comments on commit 34a7576

Please sign in to comment.