Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Add Status.Record #659

Merged
merged 2 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cs/playground/AsyncStress/FasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public FasterWrapper(bool isRefType, bool useLargeLog, bool useOsReadBuffering =
async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.UpsertAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}

async ValueTask<int> CompleteAsync(FasterKV<Key, Value>.RmwAsyncResult<Value, Value, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -82,7 +82,7 @@ public void Upsert(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.Upsert(key, value);
Assert.False(status.Pending);
Assert.False(status.IsPending);
_sessionPool.Return(session);
}

Expand All @@ -109,7 +109,7 @@ public void RMW(Key key, Value value)
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var status = session.RMW(key, value);
Assert.False(status.Pending);
Assert.False(status.IsPending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -137,7 +137,7 @@ public async ValueTask RMWChunkAsync((Key, Value)[] chunk, int offset, int count
if (!_sessionPool.TryGet(out var session))
session = _sessionPool.GetAsync().GetAwaiter().GetResult();
var result = session.Read(key);
if (result.status.Pending)
if (result.status.IsPending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/AsyncStress/SerializedFasterWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void Update<TUpdater, TAsyncResult>(TUpdater updater, Key key, Value valu
}
}

Assert.False(status.Pending);
Assert.False(status.IsPending);
_sessionPool.Return(session);
}

Expand Down Expand Up @@ -198,7 +198,7 @@ public async ValueTask UpdateChunkAsync<TUpdater, TAsyncResult>(TUpdater updater
}
}

if (result.Item1.Pending)
if (result.Item1.IsPending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
int count = 0;
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/AsyncStress/SerializedUpdaters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanBy
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.UpsertAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand All @@ -43,7 +43,7 @@ public ValueTask<FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteA
public async ValueTask<int> CompleteAsync(FasterKV<SpanByte, SpanByte>.RmwAsyncResult<SpanByte, SpanByteAndMemory, Empty> result)
{
var numPending = 0;
for (; result.Status.Pending; ++numPending)
for (; result.Status.IsPending; ++numPending)
result = await result.CompleteAsync().ConfigureAwait(false);
return numPending;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/CacheStoreConcurrent/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private static void RandomReadWorkload(int threadid)
var key = new CacheKey(k);
var status = hts.Read(ref key, ref output);

if (status.Pending)
if (status.IsPending)
{
statusPending++;
}
Expand Down
8 changes: 4 additions & 4 deletions cs/playground/ClassRecoveryDurability/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ static void Filldb(int stopAfterIteration = 5)

// Console.WriteLine("add=" + i);

if (!addStatus.InPlaceUpdatedRecord)
if (!addStatus.Record.InPlaceUpdated)
throw new Exception();
}

Expand All @@ -109,7 +109,7 @@ static void Filldb(int stopAfterIteration = 5)
var deleteStatus = session.Delete(ref deteletKey, context2, 1);
// Console.WriteLine("delete=" + i);

if (!deleteStatus.InPlaceUpdatedRecord)
if (!deleteStatus.Record.InPlaceUpdated)
throw new Exception();
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ static void TestData(Storedb store, int count)
var deleteStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test delete=" + i);

if (deleteStatus.Pending)
if (deleteStatus.IsPending)
{
session.CompletePending(true);
context.FinalizeRead(ref deleteStatus, ref output1);
Expand All @@ -215,7 +215,7 @@ static void TestData(Storedb store, int count)
var addStatus = session.Read(ref readKey, ref input1, ref output1, context, 1);
//Console.WriteLine("test add=" + i);

if (addStatus.Pending)
if (addStatus.IsPending)
{
session.CompletePending(true);
context.FinalizeRead(ref addStatus, ref output1);
Expand Down
2 changes: 1 addition & 1 deletion cs/playground/SumStore/ConcurrencyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void Test()
Input input = default;
Output output = default;
var status = session.Read(ref inputArray[i].adId, ref input, ref output, Empty.Default, i);
if (status.Pending)
if (status.IsPending)
throw new NotImplementedException();

inputArray[i].numClicks = output.value;
Expand Down
6 changes: 3 additions & 3 deletions cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private unsafe void ProcessBatch(byte* buf, int offset)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));

if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status.Found)
serializer.SkipOutput(ref dcurr);
Expand All @@ -186,7 +186,7 @@ private unsafe void ProcessBatch(byte* buf, int offset)

hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status.IsCompletedSuccessfully)
serializer.SkipOutput(ref dcurr);
Expand Down Expand Up @@ -273,7 +273,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
if (valPtr == null)
status = session.Read(ref key, ref serializer.ReadInputByRef(ref inputPtr), ref serializer.AsRefOutput(outputDcurr, (int)(dend - dcurr)), ctx, 0);

if (!status.Pending)
if (!status.IsPending)
{
// Write six bytes (message | status | sid)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Expand Down
6 changes: 3 additions & 3 deletions cs/remote/src/FASTER.server/WebsocketServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));

if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));
else if (status.Found)
serializer.SkipOutput(ref dcurr);
Expand All @@ -403,7 +403,7 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)

hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
if (status.Pending)
if (status.IsPending)
Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr));

if (subscribeKVBroker != null)
Expand Down Expand Up @@ -589,7 +589,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
if (valPtr == null)
status = session.Read(ref key, ref serializer.ReadInputByRef(ref inputPtr), ref serializer.AsRefOutput(outputDcurr, (int)(dend - dcurr)), ctx, 0);

if (!status.Pending)
if (!status.IsPending)
{
// Write six bytes (message | status | sid)
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/AzureBackedStore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static void Main()

var status = s.Read(ref key, ref output, context);

if (!status.Pending)
if (!status.IsPending)
{
if (status.Found && output == context)
Console.WriteLine("Success!");
Expand Down Expand Up @@ -94,7 +94,7 @@ static void Main()

var status = s.Read(ref key, ref output, context);

if (!status.Pending)
if (!status.IsPending)
{
if (status.Found && output == context)
Console.WriteLine("Success!");
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/CacheStore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private static void RandomReadWorkload(FasterKV<CacheKey, CacheValue> store, int
var key = new CacheKey(k);
var status = s.Read(ref key, ref output);

if (status.Pending)
if (status.IsPending)
{
statusPending++;
if (statusPending % 100 == 0)
Expand Down Expand Up @@ -186,7 +186,7 @@ private static void InteractiveReadWorkload(FasterKV<CacheKey, CacheValue> store

context.ticks = Stopwatch.GetTimestamp();
var status = s.Read(ref key, ref output, context);
if (status.Pending)
if (status.IsPending)
{
s.CompletePending(true);
}
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/HelloWorld/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static void DiskSample()
// Reads from disk will return PENDING status, result available via either asynchronous IFunctions callback
// or on this thread via CompletePendingWithOutputs, shown below
status = session.Read(ref key, ref output);
if (status.Pending)
if (status.IsPending)
{
session.CompletePendingWithOutputs(out var iter, true);
while (iter.Next())
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/ReadAddress/VersionedReadApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private static void ScanStore(FasterKV<Key, Value> store, int keyValue)
var status = session.Read(ref key, ref input, ref output, ref recordMetadata, ReadFlags.SkipCopyReads, serialNo: maxLap + 1);

// This will wait for each retrieved record; not recommended for performance-critical code or when retrieving multiple records unless necessary.
if (status.Pending)
if (status.IsPending)
{
session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
using (completedOutputs)
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/StoreAsyncApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ static async Task AsyncOperator(int id)
if (asyncUpsert)
{
var r = await session.UpsertAsync(ref key, ref value, context, seqNo++);
while (r.Status.Pending)
while (r.Status.IsPending)
r = await r.CompleteAsync();
}
else
Expand Down Expand Up @@ -130,7 +130,7 @@ static async Task AsyncOperator(int id)
for (int i = 0; i < batchSize; i++)
{
var r = await taskBatch[i];
while (r.Status.Pending)
while (r.Status.IsPending)
r = await r.CompleteAsync();
}
}
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreCustomTypes/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ static void Main()
status = s.Read(ref key, ref input, ref g2, context, 0);

// We will receive the result via ReadCompletionCallback in Functions
if (!status.Pending)
if (!status.IsPending)
Console.WriteLine("Error!");

// End session when done
Expand Down
4 changes: 2 additions & 2 deletions cs/samples/StoreDiskReadBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static async Task AsyncReadOperator(int id)
var result = session.Read(ref key, ref input, ref output, Empty.Default, 0);
if (readBatching)
{
if (!result.Pending)
if (!result.IsPending)
{
if (output.value.vfield1 != key.key)
{
Expand All @@ -166,7 +166,7 @@ static async Task AsyncReadOperator(int id)
}
else
{
if (result.Pending)
if (result.IsPending)
{
session.CompletePending(true);
}
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/AsciiSumSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void Run()
store.Log.FlushAndEvict(true); // Flush and evict all records to disk
var _status = s.RMW(_key, _input); // CopyUpdater to 270 (due to immutable source value on disk)

if (_status.Pending)
if (_status.IsPending)
{
Console.WriteLine("Error!");
return;
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/MemoryByteSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void Run()
var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Span.Fill((byte)valLen);

if (status.Pending)
if (status.IsPending)
s.CompletePending(true);
else
{
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/MemoryIntSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void Run()
var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Span.Fill(valLen);

if (status.Pending)
if (status.IsPending)
s.CompletePending(true);
else
{
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreVarLenTypes/SpanByteSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void Run()
var expectedValue = valueMem.Slice(0, valLen);
expectedValue.Fill((byte)valLen);

if (status.Pending)
if (status.IsPending)
{
s.CompletePending(true);
}
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Async/DeleteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ internal DeleteAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Va
/// <summary>Complete the Delete operation, issuing additional allocation asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
/// <returns>ValueTask for Delete result. User needs to await again if result status is Status.PENDING.</returns>
public ValueTask<DeleteAsyncResult<Input, Output, Context>> CompleteAsync(CancellationToken token = default)
=> this.Status.Pending
=> this.Status.IsPending
? updateAsyncInternal.CompleteAsync(token)
: new ValueTask<DeleteAsyncResult<Input, Output, Context>>(new DeleteAsyncResult<Input, Output, Context>(this.Status));

/// <summary>Complete the Delete operation, issuing additional I/O synchronously if needed.</summary>
/// <returns>Status of Delete operation</returns>
public Status Complete() => this.Status.Pending ? updateAsyncInternal.Complete().Status : this.Status;
public Status Complete() => this.Status.IsPending ? updateAsyncInternal.Complete().Status : this.Status;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
10 changes: 5 additions & 5 deletions cs/src/core/Async/RMWAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
pendingContext.serialNum, asyncOp, out flushEvent, out newDiskRequest);
output = pendingContext.output;

if (status.Pending && !newDiskRequest.IsDefault())
if (status.IsPending && !newDiskRequest.IsDefault())
{
flushEvent = default;
this.diskRequest = newDiskRequest;
Expand All @@ -56,7 +56,7 @@ public bool CompletePendingIO(IFasterSession<Key, Value, Input, Output, Context>
var status = completedOutputs.Next() ? completedOutputs.Current.Status : new(StatusCode.Error);
completedOutputs.Dispose();
this.diskRequest = default;
return !status.Pending;
return !status.IsPending;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -109,7 +109,7 @@ internal RmwAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value
/// <summary>Complete the RMW operation, issuing additional (rare) I/O asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
/// <returns>ValueTask for RMW result. User needs to await again if result status is pending.</returns>
public ValueTask<RmwAsyncResult<Input, TOutput, Context>> CompleteAsync(CancellationToken token = default)
=> this.Status.Pending
=> this.Status.IsPending
? updateAsyncInternal.CompleteAsync(token)
: new ValueTask<RmwAsyncResult<Input, TOutput, Context>>(new RmwAsyncResult<Input, TOutput, Context>(this.Status, this.Output, this.RecordMetadata));

Expand All @@ -122,7 +122,7 @@ public ValueTask<RmwAsyncResult<Input, TOutput, Context>> CompleteAsync(Cancella
/// <returns>Status of RMW operation</returns>
public (Status status, TOutput output) Complete(out RecordMetadata recordMetadata)
{
if (!this.Status.Pending)
if (!this.Status.IsPending)
{
recordMetadata = this.RecordMetadata;
return (this.Status, this.Output);
Expand Down Expand Up @@ -154,7 +154,7 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu
{
Output output = default;
var status = CallInternalRMW(fasterSession, currentCtx, ref pcontext, ref key, ref input, ref output, context, serialNo, asyncOp: true, out flushEvent, out diskRequest);
if (!status.Pending)
if (!status.IsPending)
return new ValueTask<RmwAsyncResult<Input, Output, Context>>(new RmwAsyncResult<Input, Output, Context>(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
}
finally
Expand Down
Loading