From 1b319a538d8a6c1a724277f4f6fc091a8a6ed028 Mon Sep 17 00:00:00 2001 From: Ted Hart <15467143+TedHartMS@users.noreply.github.com> Date: Wed, 7 Jul 2021 20:28:07 -0700 Subject: [PATCH] [C#] Add Output to RMW (#515) * Add Output to RMW: - ** BREAKING CHANGE ** I(Advanced)Functions.CopyUpdater, InitialUpdater, InPlaceUpdater, and RMWCompletionCallback now have a ref Output parameter - ** BREAKING CHANGE ** RmwAsyncResult.Complete now returns a (Status, Output) tuple * Fix Remote IFunctions implementations to new RMW signatures * Change RMW callback signature (move Output to end) * Update Remote to new RMW Output signature; add $ReadPercentages to run_benchmark * Add Output to NeedCopyUpdate as well * Propagate RMW Output throughout Remote (ICallbackFunctions etc.); update docs for RMW Output --- cs/benchmark/Functions.cs | 8 +- cs/benchmark/scripts/run_benchmark.ps1 | 21 ++- cs/playground/SumStore/SumStoreTypes.cs | 6 +- cs/remote/benchmark/FASTER.benchmark/Types.cs | 2 +- cs/remote/samples/FixedLenClient/Functions.cs | 20 ++- cs/remote/samples/FixedLenClient/Program.cs | 37 +++-- cs/remote/samples/FixedLenServer/Types.cs | 17 ++- .../FASTER.client/CallbackFunctionsBase.cs | 2 +- cs/remote/src/FASTER.client/ClientSession.cs | 67 ++++++-- .../src/FASTER.client/ClientSessionAsync.cs | 8 +- .../src/FASTER.client/ICallbackFunctions.cs | 3 +- .../src/FASTER.client/MemoryFunctionsBase.cs | 3 +- .../src/FASTER.server/BinaryServerSession.cs | 13 +- .../FasterKVServerSessionBase.cs | 2 +- .../src/FASTER.server/ServerKVFunctions.cs | 22 +-- .../FASTER.remote.test/VarLenMemoryClient.cs | 2 +- cs/samples/ReadAddress/Types.cs | 2 +- .../StoreCheckpointRecover/Functions.cs | 6 +- cs/samples/StoreCustomTypes/Functions.cs | 6 +- cs/samples/StoreDiskReadBenchmark/Types.cs | 6 +- .../AsciiSumSpanByteFunctions.cs | 6 +- cs/src/core/Async/DeleteAsync.cs | 5 +- cs/src/core/Async/RMWAsync.cs | 54 ++++--- cs/src/core/Async/UpdateAsync.cs | 11 +- cs/src/core/Async/UpsertAsync.cs | 5 +- .../ClientSession/AdvancedClientSession.cs | 71 ++++++--- cs/src/core/ClientSession/ClientSession.cs | 71 ++++++--- cs/src/core/Index/Common/CompletedOutput.cs | 8 +- cs/src/core/Index/FASTER/FASTER.cs | 4 +- cs/src/core/Index/FASTER/FASTERImpl.cs | 49 +++--- cs/src/core/Index/FASTER/FASTERLegacy.cs | 23 +-- cs/src/core/Index/FASTER/FASTERThread.cs | 4 +- .../Index/FASTER/LogCompactionFunctions.cs | 10 +- cs/src/core/Index/Interfaces/FunctionsBase.cs | 36 ++--- .../Index/Interfaces/IAdvancedFunctions.cs | 15 +- cs/src/core/Index/Interfaces/IFunctions.cs | 15 +- .../core/Index/Interfaces/TryAddFunctions.cs | 4 +- cs/src/core/VarLen/MemoryFunctions.cs | 6 +- cs/src/core/VarLen/SpanByteFunctions.cs | 6 +- cs/test/AsyncTests.cs | 8 +- cs/test/BasicFASTERTests.cs | 12 +- cs/test/CompletePendingTests.cs | 42 ++--- cs/test/FunctionPerSessionTests.cs | 12 +- cs/test/LockTests.cs | 2 +- cs/test/MemoryLogCompactionTests.cs | 2 +- cs/test/NativeReadCacheTests.cs | 12 +- cs/test/NeedCopyUpdateTests.cs | 8 +- cs/test/ObjectRecoveryTest2.cs | 8 +- cs/test/ObjectRecoveryTestTypes.cs | 8 +- cs/test/ObjectTestTypes.cs | 28 ++-- cs/test/ReadAddressTests.cs | 4 +- cs/test/RecoverContinueTests.cs | 8 +- cs/test/RecoveryTestTypes.cs | 8 +- cs/test/SimpleAsyncTests.cs | 71 ++++++--- cs/test/SimpleRecoveryTest.cs | 8 +- cs/test/TestTypes.cs | 143 ++++++++++++------ cs/test/VLTestTypes.cs | 4 +- docs/_docs/20-fasterkv-basics.md | 13 +- docs/_docs/50-remote-basics.md | 16 +- 59 files changed, 678 insertions(+), 395 deletions(-) diff --git a/cs/benchmark/Functions.cs b/cs/benchmark/Functions.cs index 310106d24..b5112062c 100644 --- a/cs/benchmark/Functions.cs +++ b/cs/benchmark/Functions.cs @@ -15,7 +15,7 @@ public struct Functions : IFunctions public Functions(bool locking) => this.locking = locking; - public void RMWCompletionCallback(ref Key key, ref Input input, Empty ctx, Status status) + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status) { } @@ -65,20 +65,20 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) // RMW functions [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void InitialUpdater(ref Key key, ref Input input, ref Value value) + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { value.value = input.value; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) + public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { value.value += input.value; return true; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { newValue.value = input.value + oldValue.value; } diff --git a/cs/benchmark/scripts/run_benchmark.ps1 b/cs/benchmark/scripts/run_benchmark.ps1 index 475fbf368..d94171d90 100644 --- a/cs/benchmark/scripts/run_benchmark.ps1 +++ b/cs/benchmark/scripts/run_benchmark.ps1 @@ -26,7 +26,7 @@ Number of seconds to run the experiment. Used primarily to debug changes to this script or do a quick one-off run; the default is 30 seconds. -.PARAMETER NumThreads +.PARAMETER ThreadCount Number of threads to use. Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script. @@ -34,6 +34,13 @@ Locking mode to use: 0 = No locking, 1 = RecordInfo locking Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script. +.PARAMETER ReadPercentages + Keys the Operation to perform: An array of one or more of: + 0 = No read (Upsert workload only) + 100 = All reads + Between 0 and 100 = mix of reads and upserts + -1 = All RMWs + .PARAMETER UseRecover Recover the FasterKV from a checkpoint of a previous run rather than loading it from data. Used primarily to debug changes to this script or do a quick one-off run; the default is false. @@ -63,9 +70,9 @@ Runs 3 directories. .EXAMPLE - pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -CloneAndBuild " + pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -ReadPercentages -1 " - Clones the master branch to the .\master folder, the branch_with_my_changes to the branch_with_my_changes folder, and runs those with any specified. + Runs an RMW-only workload .EXAMPLE pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -CloneAndBuild " @@ -76,9 +83,10 @@ param ( [Parameter(Mandatory=$true)] [string[]]$ExeDirs, [Parameter(Mandatory=$false)] [int]$RunSeconds = 30, [Parameter(Mandatory=$false)] [int]$ThreadCount = -1, - [Parameter(Mandatory=$false)] [int]$lockMode = -1, + [Parameter(Mandatory=$false)] [int]$LockMode = -1, + [Parameter(Mandatory=$false)] [int[]]$ReadPercentages, [Parameter(Mandatory=$false)] [switch]$UseRecover, - [Parameter(Mandatory=$false)] [switch]$CloneAndBuild. + [Parameter(Mandatory=$false)] [switch]$CloneAndBuild, [Parameter(Mandatory=$false)] [switch]$NetCore31 ) @@ -136,6 +144,9 @@ if ($ThreadCount -ge 0) { if ($LockMode -ge 0) { $lockModes = ($LockMode) } +if ($ReadPercentages) { + $readPercents = $ReadPercentages +} if ($UseRecover) { $k = "-k" } diff --git a/cs/playground/SumStore/SumStoreTypes.cs b/cs/playground/SumStore/SumStoreTypes.cs index a7467f5bd..60fd1be15 100644 --- a/cs/playground/SumStore/SumStoreTypes.cs +++ b/cs/playground/SumStore/SumStoreTypes.cs @@ -56,18 +56,18 @@ public override void ConcurrentReader(ref AdId key, ref Input input, ref NumClic } // RMW functions - public override void InitialUpdater(ref AdId key, ref Input input, ref NumClicks value) + public override void InitialUpdater(ref AdId key, ref Input input, ref NumClicks value, ref Output output) { value = input.numClicks; } - public override bool InPlaceUpdater(ref AdId key, ref Input input, ref NumClicks value) + public override bool InPlaceUpdater(ref AdId key, ref Input input, ref NumClicks value, ref Output output) { Interlocked.Add(ref value.numClicks, input.numClicks.numClicks); return true; } - public override void CopyUpdater(ref AdId key, ref Input input, ref NumClicks oldValue, ref NumClicks newValue) + public override void CopyUpdater(ref AdId key, ref Input input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output) { newValue.numClicks = oldValue.numClicks + input.numClicks.numClicks; } diff --git a/cs/remote/benchmark/FASTER.benchmark/Types.cs b/cs/remote/benchmark/FASTER.benchmark/Types.cs index 81f73fb88..372e24f1c 100644 --- a/cs/remote/benchmark/FASTER.benchmark/Types.cs +++ b/cs/remote/benchmark/FASTER.benchmark/Types.cs @@ -58,7 +58,7 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp { } - public void RMWCompletionCallback(ref Key key, ref Input input, Empty ctx, Status status) + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status) { } diff --git a/cs/remote/samples/FixedLenClient/Functions.cs b/cs/remote/samples/FixedLenClient/Functions.cs index a98b8bf3a..7bf04cf9d 100644 --- a/cs/remote/samples/FixedLenClient/Functions.cs +++ b/cs/remote/samples/FixedLenClient/Functions.cs @@ -15,18 +15,30 @@ public override void ReadCompletionCallback(ref long key, ref long input, ref lo { if (ctx == 0) { - if (status != Status.OK || key + 10000 != output) - throw new Exception("Incorrect read result"); + var expected = key + 10000; + if (status != Status.OK || expected != output) + throw new Exception($"Incorrect read result for key {key}; expected = {expected}, actual = {output}"); } else if (ctx == 1) { - if (status != Status.OK || key + 10000 + 25 + 25 != output) - throw new Exception("Incorrect read result"); + var expected = key + 10000 + 25 + 25; + if (status != Status.OK || expected != output) + throw new Exception($"Incorrect read result for key {key}; expected = {expected}, actual = {output}"); } else { throw new Exception("Unexpected user context"); } } + + public override void RMWCompletionCallback(ref long key, ref long input, ref long output, byte ctx, Status status) + { + if (ctx == 1) + { + var expected = key + 10000 + 25 + 25 + 25; + if (status != Status.OK || expected != output) + throw new Exception($"Incorrect read result for key {key}; expected = {expected}, actual = {output}"); + } + } } } diff --git a/cs/remote/samples/FixedLenClient/Program.cs b/cs/remote/samples/FixedLenClient/Program.cs index e8a527da4..0cdd46b9b 100644 --- a/cs/remote/samples/FixedLenClient/Program.cs +++ b/cs/remote/samples/FixedLenClient/Program.cs @@ -82,6 +82,10 @@ static void SyncSamples(ClientSession public bool SupportsLocking => false; // Callbacks - public void RMWCompletionCallback(ref Key key, ref Input input, long ctx, Status status) { } + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { } public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { } @@ -90,17 +90,26 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) // RMW functions [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void InitialUpdater(ref Key key, ref Input input, ref Value value) => value.value = input.value; + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) + { + value.value = input.value; + output.value = value; + } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) + public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { Interlocked.Add(ref value.value, input.value); + output.value = value; return true; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) => newValue.value = input.value + oldValue.value; + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) + { + newValue.value = input.value + oldValue.value; + output.value = newValue; + } public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { } diff --git a/cs/remote/src/FASTER.client/CallbackFunctionsBase.cs b/cs/remote/src/FASTER.client/CallbackFunctionsBase.cs index f1b9540f1..c873d7ef4 100644 --- a/cs/remote/src/FASTER.client/CallbackFunctionsBase.cs +++ b/cs/remote/src/FASTER.client/CallbackFunctionsBase.cs @@ -20,7 +20,7 @@ public virtual void DeleteCompletionCallback(ref Key key, Context ctx) { } /// public virtual void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { } /// - public virtual void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) { } + public virtual void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { } /// public virtual void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { } } diff --git a/cs/remote/src/FASTER.client/ClientSession.cs b/cs/remote/src/FASTER.client/ClientSession.cs index 025d7d2be..ab2a029e0 100644 --- a/cs/remote/src/FASTER.client/ClientSession.cs +++ b/cs/remote/src/FASTER.client/ClientSession.cs @@ -148,7 +148,22 @@ public Status Read(Key key, Input input, out Output output, Context userContext /// Serial number /// Status of operation public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0) - => InternalRMW(MessageType.RMW, ref key, ref input, userContext, serialNo); + { + Output output = default; + return InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo); + } + + /// + /// RMW (read-modify-write) operation + /// + /// Key + /// Input + /// Output + /// User context + /// Serial number + /// Status of operation + public Status RMW(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) + => InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo); /// /// RMW (read-modify-write) operation @@ -159,7 +174,25 @@ public Status RMW(ref Key key, ref Input input, Context userContext = default, l /// Serial number /// Status of operation public Status RMW(Key key, Input input, Context userContext = default, long serialNo = 0) - => InternalRMW(MessageType.RMW, ref key, ref input, userContext, serialNo); + { + Output output = default; + return InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo); + } + + /// + /// RMW (read-modify-write) operation + /// + /// Key + /// Input + /// Output + /// User context + /// Serial number + /// Status of operation + public Status RMW(Key key, Input input, out Output output, Context userContext = default, long serialNo = 0) + { + output = default; + return InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo); + } /// /// Delete operation @@ -323,13 +356,18 @@ internal void ProcessReplies(byte[] buf, int offset) { var status = ReadStatus(ref src); var result = readrmwQueue.Dequeue(); - if (status == Status.PENDING) + if (status == Status.OK || status == Status.NOTFOUND) + { + result.Item3 = serializer.ReadOutput(ref src); + functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status); + } + else if (status == Status.PENDING) { var p = hrw.ReadPendingSeqNo(ref src); readRmwPendingContext.Add(p, result); } else - functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, result.Item4, status); + functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status); break; } case MessageType.RMWAsync: @@ -337,7 +375,9 @@ internal void ProcessReplies(byte[] buf, int offset) var status = ReadStatus(ref src); var result = readrmwQueue.Dequeue(); var tcs = tcsQueue.Dequeue(); - if (status == Status.PENDING) + if (status == Status.OK || status == Status.NOTFOUND) + tcs.SetResult((status, serializer.ReadOutput(ref src))); + else if (status == Status.PENDING) { var p = hrw.ReadPendingSeqNo(ref src); readRmwPendingTcs.Add(p, tcs); @@ -427,7 +467,13 @@ private void HandlePending(ref byte* src) readRmwPendingContext.TryGetValue(p, out var result); readRmwPendingContext.Remove(p); #endif - functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, result.Item4, status); + if (status == Status.OK || status == Status.NOTFOUND) + { + result.Item3 = serializer.ReadOutput(ref src); + functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status); + } + else + functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status); break; } case MessageType.RMWAsync: @@ -439,7 +485,10 @@ private void HandlePending(ref byte* src) readRmwPendingTcs.TryGetValue(p, out var result); readRmwPendingTcs.Remove(p); #endif - result.SetResult((status, default)); + if (status == Status.OK || status == Status.NOTFOUND) + result.SetResult((status, serializer.ReadOutput(ref src))); + else + result.SetResult((status, default)); break; } default: @@ -528,7 +577,7 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Input input, Context userContext = default, long serialNo = 0) + private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) { while (true) { @@ -540,7 +589,7 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu { numMessages++; offset = curr; - readrmwQueue.Enqueue((key, input, default, userContext)); + readrmwQueue.Enqueue((key, input, output, userContext)); return Status.PENDING; } Flush(); diff --git a/cs/remote/src/FASTER.client/ClientSessionAsync.cs b/cs/remote/src/FASTER.client/ClientSessionAsync.cs index 9878a7006..6d5ecba65 100644 --- a/cs/remote/src/FASTER.client/ClientSessionAsync.cs +++ b/cs/remote/src/FASTER.client/ClientSessionAsync.cs @@ -52,14 +52,14 @@ public async Task UpsertAsync(Key key, Value value, bool forceFlush = true) /// Input /// Force immediate flush of message buffer /// Async result of RMW operation (status) - public async Task RMWAsync(Key key, Input input, bool forceFlush = true) + public Task<(Status, Output)> RMWAsync(Key key, Input input, bool forceFlush = true) { var tcs = new TaskCompletionSource<(Status, Output)>(TaskCreationOptions.RunContinuationsAsynchronously); - InternalRMW(MessageType.RMWAsync, ref key, ref input); + Output output = default; + InternalRMW(MessageType.RMWAsync, ref key, ref input, ref output); tcsQueue.Enqueue(tcs); if (forceFlush) Flush(); - (var status, _) = await tcs.Task; - return status; + return tcs.Task; } /// diff --git a/cs/remote/src/FASTER.client/ICallbackFunctions.cs b/cs/remote/src/FASTER.client/ICallbackFunctions.cs index 9cc378519..09136ba73 100644 --- a/cs/remote/src/FASTER.client/ICallbackFunctions.cs +++ b/cs/remote/src/FASTER.client/ICallbackFunctions.cs @@ -38,9 +38,10 @@ public interface ICallbackFunctions /// /// /// + /// /// /// - void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status); + void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status); /// /// Delete completion diff --git a/cs/remote/src/FASTER.client/MemoryFunctionsBase.cs b/cs/remote/src/FASTER.client/MemoryFunctionsBase.cs index ea5ea328c..4c13f652d 100644 --- a/cs/remote/src/FASTER.client/MemoryFunctionsBase.cs +++ b/cs/remote/src/FASTER.client/MemoryFunctionsBase.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -using FASTER.common; using System; using System.Buffers; @@ -21,7 +20,7 @@ public virtual void DeleteCompletionCallback(ref ReadOnlyMemory key, byte ctx public virtual void ReadCompletionCallback(ref ReadOnlyMemory key, ref ReadOnlyMemory input, ref (IMemoryOwner, int) output, byte ctx, Status status) { } /// - public virtual void RMWCompletionCallback(ref ReadOnlyMemory key, ref ReadOnlyMemory input, byte ctx, Status status) { } + public virtual void RMWCompletionCallback(ref ReadOnlyMemory key, ref ReadOnlyMemory input, ref (IMemoryOwner, int) output, byte ctx, Status status) { } /// public virtual void UpsertCompletionCallback(ref ReadOnlyMemory key, ref ReadOnlyMemory value, byte ctx) { } diff --git a/cs/remote/src/FASTER.server/BinaryServerSession.cs b/cs/remote/src/FASTER.server/BinaryServerSession.cs index ed84faf14..27383313a 100644 --- a/cs/remote/src/FASTER.server/BinaryServerSession.cs +++ b/cs/remote/src/FASTER.server/BinaryServerSession.cs @@ -65,18 +65,20 @@ public override void CompleteRead(ref Output output, long ctx, Status status) msgnum++; } - public override void CompleteRMW(long ctx, Status status) + public override void CompleteRMW(ref Output output, long ctx, Status status) { byte* d = responseObject.obj.bufferPtr; var dend = d + responseObject.obj.buffer.Length; - if ((int)(dend - dcurr) < 7) + if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize) SendAndReset(ref d, ref dend); hrw.Write(MessageType.PendingResult, ref dcurr, (int)(dend - dcurr)); hrw.Write((MessageType)(ctx >> 32), ref dcurr, (int)(dend - dcurr)); Write((int)(ctx & 0xffffffff), ref dcurr, (int)(dend - dcurr)); Write(ref status, ref dcurr, (int)(dend - dcurr)); + if (status == Status.OK || status == Status.NOTFOUND) + serializer.Write(ref output, ref dcurr, (int)(dend - dcurr)); msgnum++; } @@ -156,16 +158,19 @@ private unsafe void ProcessBatch(byte[] buf, int offset) case MessageType.RMW: case MessageType.RMWAsync: - if ((int)(dend - dcurr) < 2) + if ((int)(dend - dcurr) < 2 + maxSizeSettings.MaxOutputSize) SendAndReset(ref d, ref dend); ctx = ((long)message << 32) | (long)pendingSeqNo; - status = session.RMW(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src), ctx); + status = session.RMW(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src), + ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx); hrw.Write(message, ref dcurr, (int)(dend - dcurr)); Write(ref status, ref dcurr, (int)(dend - dcurr)); if (status == Status.PENDING) Write(pendingSeqNo++, ref dcurr, (int)(dend - dcurr)); + else if (status == Status.OK || status == Status.NOTFOUND) + serializer.SkipOutput(ref dcurr); break; case MessageType.Delete: diff --git a/cs/remote/src/FASTER.server/FasterKVServerSessionBase.cs b/cs/remote/src/FASTER.server/FasterKVServerSessionBase.cs index 32e7bf440..17f5bca72 100644 --- a/cs/remote/src/FASTER.server/FasterKVServerSessionBase.cs +++ b/cs/remote/src/FASTER.server/FasterKVServerSessionBase.cs @@ -21,7 +21,7 @@ public FasterKVServerSessionBase(Socket socket, FasterKV store, Func } public abstract void CompleteRead(ref Output output, long ctx, Status status); - public abstract void CompleteRMW(long ctx, Status status); + public abstract void CompleteRMW(ref Output output, long ctx, Status status); public override void Dispose() diff --git a/cs/remote/src/FASTER.server/ServerKVFunctions.cs b/cs/remote/src/FASTER.server/ServerKVFunctions.cs index 9cff220d7..5e415a3f0 100644 --- a/cs/remote/src/FASTER.server/ServerKVFunctions.cs +++ b/cs/remote/src/FASTER.server/ServerKVFunctions.cs @@ -30,20 +30,20 @@ public void ConcurrentReader(ref Key key, ref Input input, ref Value value, ref public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) => functions.ConcurrentWriter(ref key, ref src, ref dst); - public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) - => functions.NeedCopyUpdate(ref key, ref input, ref oldValue); + public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) + => functions.NeedCopyUpdate(ref key, ref input, ref oldValue, ref output); - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) - => functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue); + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) + => functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); public void DeleteCompletionCallback(ref Key key, long ctx) => functions.DeleteCompletionCallback(ref key, ctx); - public void InitialUpdater(ref Key key, ref Input input, ref Value value) - => functions.InitialUpdater(ref key, ref input, ref value); + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) + => functions.InitialUpdater(ref key, ref input, ref value, ref output); - public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) - => functions.InPlaceUpdater(ref key, ref input, ref value); + public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) + => functions.InPlaceUpdater(ref key, ref input, ref value, ref output); public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { @@ -51,10 +51,10 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status); } - public void RMWCompletionCallback(ref Key key, ref Input input, long ctx, Status status) + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { - serverNetworkSession.CompleteRMW(ctx, status); - functions.RMWCompletionCallback(ref key, ref input, ctx, status); + serverNetworkSession.CompleteRMW(ref output, ctx, status); + functions.RMWCompletionCallback(ref key, ref input, ref output, ctx, status); } public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst) diff --git a/cs/remote/test/FASTER.remote.test/VarLenMemoryClient.cs b/cs/remote/test/FASTER.remote.test/VarLenMemoryClient.cs index 57e63e03c..2b161d1cf 100644 --- a/cs/remote/test/FASTER.remote.test/VarLenMemoryClient.cs +++ b/cs/remote/test/FASTER.remote.test/VarLenMemoryClient.cs @@ -53,7 +53,7 @@ public virtual void ReadCompletionCallback(ref ReadOnlyMemory key, ref Read } /// - public virtual void RMWCompletionCallback(ref ReadOnlyMemory key, ref ReadOnlyMemory input, long ctx, Status status) { } + public virtual void RMWCompletionCallback(ref ReadOnlyMemory key, ref ReadOnlyMemory input, ref (IMemoryOwner, int) output, long ctx, Status status) { } /// public virtual void UpsertCompletionCallback(ref ReadOnlyMemory key, ref ReadOnlyMemory value, long ctx) { } diff --git a/cs/samples/ReadAddress/Types.cs b/cs/samples/ReadAddress/Types.cs index 948501212..f1306a949 100644 --- a/cs/samples/ReadAddress/Types.cs +++ b/cs/samples/ReadAddress/Types.cs @@ -45,7 +45,7 @@ public class Functions : AdvancedSimpleFunctions // Return false to force a chain of values. public override bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => false; - public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref RecordInfo recordInfo, long address) => false; + public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output, ref RecordInfo recordInfo, long address) => false; // Track the recordInfo for its PreviousAddress. public override void ReadCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status, RecordInfo recordInfo) diff --git a/cs/samples/StoreCheckpointRecover/Functions.cs b/cs/samples/StoreCheckpointRecover/Functions.cs index fc1d0e1ed..490cda9ed 100644 --- a/cs/samples/StoreCheckpointRecover/Functions.cs +++ b/cs/samples/StoreCheckpointRecover/Functions.cs @@ -8,9 +8,9 @@ namespace StoreCheckpointRecover { public sealed class Functions : FunctionsBase { - public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value) => value.value = input.value; - public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue) => newValue = oldValue; - public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value) { value.value += input.value; return true; } + public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) => value.value = input.value; + public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output) => newValue = oldValue; + public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) { value.value += input.value; return true; } public override void SingleReader(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput dst) diff --git a/cs/samples/StoreCustomTypes/Functions.cs b/cs/samples/StoreCustomTypes/Functions.cs index a1d09613f..64de9a7e4 100644 --- a/cs/samples/StoreCustomTypes/Functions.cs +++ b/cs/samples/StoreCustomTypes/Functions.cs @@ -8,9 +8,9 @@ namespace StoreCustomTypes { public sealed class Functions : FunctionsBase { - public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value) => value.value = input.value; - public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue) => newValue = oldValue; - public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value) { value.value += input.value; return true; } + public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) => value.value = input.value; + public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output) => newValue = oldValue; + public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) { value.value += input.value; return true; } public override void SingleReader(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput dst) => dst.value = value; diff --git a/cs/samples/StoreDiskReadBenchmark/Types.cs b/cs/samples/StoreDiskReadBenchmark/Types.cs index fa41feac6..a301365a9 100644 --- a/cs/samples/StoreDiskReadBenchmark/Types.cs +++ b/cs/samples/StoreDiskReadBenchmark/Types.cs @@ -62,15 +62,15 @@ public override void ConcurrentReader(ref Key key, ref Input input, ref Value va { if (dst == null) dst = new Output(); dst.value = value; } // RMW functions - public override void InitialUpdater(ref Key key, ref Input input, ref Value value) + public override void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { value.vfield1 = input.ifield1; } - public override void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) + public override void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { newValue.vfield1 = oldValue.vfield1 + input.ifield1; } - public override bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) + public override bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { value.vfield1 += input.ifield1; return true; diff --git a/cs/samples/StoreVarLenTypes/AsciiSumSpanByteFunctions.cs b/cs/samples/StoreVarLenTypes/AsciiSumSpanByteFunctions.cs index 4949309ff..9d59c8921 100644 --- a/cs/samples/StoreVarLenTypes/AsciiSumSpanByteFunctions.cs +++ b/cs/samples/StoreVarLenTypes/AsciiSumSpanByteFunctions.cs @@ -17,13 +17,13 @@ public sealed class AsciiSumSpanByteFunctions : SpanByteFunctions public AsciiSumSpanByteFunctions(MemoryPool memoryPool = null, bool locking = false) : base(memoryPool, locking) { } /// - public override void InitialUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value) + public override void InitialUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory output) { input.CopyTo(ref value); } /// - public override bool InPlaceUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value) + public override bool InPlaceUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory output) { long curr = Utils.BytesToLong(value.AsSpan()); long next = curr + Utils.BytesToLong(input.AsSpan()); @@ -33,7 +33,7 @@ public override bool InPlaceUpdater(ref SpanByte key, ref SpanByte input, ref Sp } /// - public override void CopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue) + public override void CopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output) { long curr = Utils.BytesToLong(oldValue.AsSpan()); long next = curr + Utils.BytesToLong(input.AsSpan()); diff --git a/cs/src/core/Async/DeleteAsync.cs b/cs/src/core/Async/DeleteAsync.cs index a3bd8dd47..48b408641 100644 --- a/cs/src/core/Async/DeleteAsync.cs +++ b/cs/src/core/Async/DeleteAsync.cs @@ -40,9 +40,6 @@ public ValueTask> DoSlowOperation(Fast /// public void DecrementPending(FasterExecutionContext currentCtx, ref PendingContext pendingContext) { } - - /// - public Status GetStatus(DeleteAsyncResult asyncResult) => asyncResult.Status; } /// @@ -78,7 +75,7 @@ public ValueTask> CompleteAsync(Cancel /// Complete the Delete operation, issuing additional I/O synchronously if needed. /// Status of Delete operation - public Status Complete() => this.Status != Status.PENDING ? this.Status : updateAsyncInternal.Complete(); + public Status Complete() => this.Status != Status.PENDING ? this.Status : updateAsyncInternal.Complete().Status; } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/cs/src/core/Async/RMWAsync.cs b/cs/src/core/Async/RMWAsync.cs index f9c8fdb9d..3276aa652 100644 --- a/cs/src/core/Async/RMWAsync.cs +++ b/cs/src/core/Async/RMWAsync.cs @@ -19,18 +19,18 @@ internal struct RmwAsyncOperation : IUpdateAsyncOperatio internal RmwAsyncOperation(AsyncIOContext diskRequest) => this.diskRequest = diskRequest; /// - public RmwAsyncResult CreateResult(Status status, Output output) => new RmwAsyncResult(status, output); + public RmwAsyncResult CreateResult(Status status, Output output) => new(status, output); /// public Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession, FasterExecutionContext currentCtx, bool asyncOp, out CompletionEvent flushEvent, out Output output) { - output = default; flushEvent = fasterKV.hlog.FlushEvent; Status status = !this.diskRequest.IsDefault() ? fasterKV.InternalCompletePendingRequestFromContext(currentCtx, currentCtx, fasterSession, this.diskRequest, ref pendingContext, asyncOp, out AsyncIOContext newDiskRequest) - : fasterKV.CallInternalRMW(fasterSession, currentCtx, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), pendingContext.userContext, + : fasterKV.CallInternalRMW(fasterSession, currentCtx, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, pendingContext.userContext, pendingContext.serialNum, asyncOp, out flushEvent, out newDiskRequest); + output = pendingContext.output; if (status == Status.PENDING && !newDiskRequest.IsDefault()) { @@ -65,49 +65,54 @@ public void DecrementPending(FasterExecutionContext curr currentCtx.asyncPendingCount--; currentCtx.pendingReads.Remove(); } - - /// - public Status GetStatus(RmwAsyncResult asyncResult) => asyncResult.Status; } /// /// State storage for the completion of an async RMW, or the result if the RMW was completed synchronously /// - public struct RmwAsyncResult + public struct RmwAsyncResult { - internal readonly UpdateAsyncInternal, RmwAsyncResult> updateAsyncInternal; - internal readonly Output output; + internal readonly UpdateAsyncInternal, RmwAsyncResult> updateAsyncInternal; /// Current status of the RMW operation public Status Status { get; } - internal RmwAsyncResult(Status status, Output output) + /// Output of the RMW operation if current status is not + public TOutput Output { get; } + + internal RmwAsyncResult(Status status, TOutput output) { this.Status = status; - this.output = output; + this.Output = output; this.updateAsyncInternal = default; } - internal RmwAsyncResult(FasterKV fasterKV, IFasterSession fasterSession, - FasterExecutionContext currentCtx, PendingContext pendingContext, + internal RmwAsyncResult(FasterKV fasterKV, IFasterSession fasterSession, + FasterExecutionContext currentCtx, PendingContext pendingContext, AsyncIOContext diskRequest, ExceptionDispatchInfo exceptionDispatchInfo) { Status = Status.PENDING; - output = default; - updateAsyncInternal = new UpdateAsyncInternal, RmwAsyncResult>( - fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new RmwAsyncOperation(diskRequest)); + this.Output = default; + updateAsyncInternal = new UpdateAsyncInternal, RmwAsyncResult>( + fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new RmwAsyncOperation(diskRequest)); } /// Complete the RMW operation, issuing additional (rare) I/O asynchronously if needed. It is usually preferable to use Complete() instead of this. - /// ValueTask for RMW result. User needs to await again if result status is Status.PENDING. - public ValueTask> CompleteAsync(CancellationToken token = default) + /// ValueTask for RMW result. User needs to await again if result status is . + public ValueTask> CompleteAsync(CancellationToken token = default) => this.Status != Status.PENDING - ? new ValueTask>(new RmwAsyncResult(this.Status, default)) + ? new ValueTask>(new RmwAsyncResult(this.Status, this.Output)) : updateAsyncInternal.CompleteAsync(token); /// Complete the RMW operation, issuing additional (rare) I/O synchronously if needed. /// Status of RMW operation - public Status Complete() => this.Status != Status.PENDING ? this.Status : updateAsyncInternal.Complete(); + public (Status status, TOutput output) Complete() + { + if (this.Status != Status.PENDING) + return (this.Status, this.Output); + var rmwAsyncResult = updateAsyncInternal.Complete(); + return (rmwAsyncResult.Status, rmwAsyncResult.Output); + } } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -129,9 +134,10 @@ internal ValueTask> RmwAsync>(new RmwAsyncResult(status, default)); + return new ValueTask>(new RmwAsyncResult(status, output)); } finally { @@ -145,7 +151,7 @@ internal ValueTask> RmwAsync(IFasterSession fasterSession, - FasterExecutionContext currentCtx, ref PendingContext pcontext, ref Key key, ref Input input, Context context, long serialNo, + FasterExecutionContext currentCtx, ref PendingContext pcontext, ref Key key, ref Input input, ref Output output, Context context, long serialNo, bool asyncOp, out CompletionEvent flushEvent, out AsyncIOContext diskRequest) { diskRequest = default; @@ -153,7 +159,7 @@ private Status CallInternalRMW(IFasterSession DoSlowOperation(FasterKV fasterKV, IFasterSe /// The for this operation /// The for the pending operation void DecrementPending(FasterExecutionContext currentCtx, ref PendingContext pendingContext); - - /// - /// Returns the current status of the ; usually examined for whether it or not. - /// - /// - /// The current status of the - Status GetStatus(TAsyncResult asyncResult); } internal sealed class UpdateAsyncInternal @@ -116,7 +109,7 @@ internal ValueTask CompleteAsync(CancellationToken token = default return _asyncOperation.DoSlowOperation(_fasterKV, _fasterSession, _currentCtx, _pendingContext, flushEvent, token); } - internal Status Complete() + internal TAsyncResult Complete() { if (!TryCompleteAsyncState(asyncOp: false, out CompletionEvent flushEvent, out TAsyncResult asyncResult)) { @@ -130,7 +123,7 @@ internal Status Complete() flushEvent.Wait(); } } - return _asyncOperation.GetStatus(asyncResult); + return asyncResult; } private bool TryCompleteAsyncState(bool asyncOp, out CompletionEvent flushEvent, out TAsyncResult asyncResult) diff --git a/cs/src/core/Async/UpsertAsync.cs b/cs/src/core/Async/UpsertAsync.cs index b4d997512..a5e9096a7 100644 --- a/cs/src/core/Async/UpsertAsync.cs +++ b/cs/src/core/Async/UpsertAsync.cs @@ -40,9 +40,6 @@ public ValueTask> DoSlowOperation(Fast /// public void DecrementPending(FasterExecutionContext currentCtx, ref PendingContext pendingContext) { } - - /// - public Status GetStatus(UpsertAsyncResult asyncResult) => asyncResult.Status; } /// @@ -78,7 +75,7 @@ public ValueTask> CompleteAsync(Cancel /// Complete the Upsert operation, issuing additional I/O synchronously if needed. /// Status of Upsert operation - public Status Complete() => this.Status != Status.PENDING ? this.Status : updateAsyncInternal.Complete(); + public Status Complete() => this.Status != Status.PENDING ? this.Status : updateAsyncInternal.Complete().Status; } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/cs/src/core/ClientSession/AdvancedClientSession.cs b/cs/src/core/ClientSession/AdvancedClientSession.cs index 23dc22224..e4862605d 100644 --- a/cs/src/core/ClientSession/AdvancedClientSession.cs +++ b/cs/src/core/ClientSession/AdvancedClientSession.cs @@ -523,16 +523,17 @@ public ValueTask.UpsertAsyncResult> /// /// /// + /// /// /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0) + public Status RMW(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) { if (SupportAsync) UnsafeResumeThread(); try { - return fht.ContextRMW(ref key, ref input, userContext, FasterSession, serialNo, ctx); + return fht.ContextRMW(ref key, ref input, ref output, userContext, FasterSession, serialNo, ctx); } finally { @@ -540,6 +541,37 @@ public Status RMW(ref Key key, ref Input input, Context userContext = default, l } } + /// + /// RMW operation + /// + /// + /// + /// + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Status RMW(Key key, Input input, out Output output, Context userContext = default, long serialNo = 0) + { + output = default; + return RMW(ref key, ref input, ref output, userContext, serialNo); + } + + /// + /// RMW operation + /// + /// + /// + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0) + { + Output output = default; + return RMW(ref key, ref input, ref output, userContext, serialNo); + } + /// /// RMW operation /// @@ -550,7 +582,10 @@ public Status RMW(ref Key key, ref Input input, Context userContext = default, l /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status RMW(Key key, Input input, Context userContext = default, long serialNo = 0) - => RMW(ref key, ref input, userContext, serialNo); + { + Output output = default; + return RMW(ref key, ref input, ref output, userContext, serialNo); + } /// /// Async RMW operation @@ -991,12 +1026,12 @@ private void ConcurrentDeleterLock(ref Key key, ref Value value, ref RecordInfo } } - public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) - => _clientSession.functions.NeedCopyUpdate(ref key, ref input, ref oldValue); + public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) + => _clientSession.functions.NeedCopyUpdate(ref key, ref input, ref oldValue, ref output); - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { - _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue); + _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); } public void DeleteCompletionCallback(ref Key key, Context ctx) @@ -1014,32 +1049,32 @@ public int GetLength(ref Value t, ref Input input) return _clientSession.variableLengthStruct.GetLength(ref t, ref input); } - public void InitialUpdater(ref Key key, ref Input input, ref Value value) + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { - _clientSession.functions.InitialUpdater(ref key, ref input, ref value); + _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) + public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => !this.SupportsLocking - ? InPlaceUpdaterNoLock(ref key, ref input, ref value, ref recordInfo, address) - : InPlaceUpdaterLock(ref key, ref input, ref value, ref recordInfo, address); + ? InPlaceUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address) + : InPlaceUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address); [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) + private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; - return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref recordInfo, address); + return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); } - private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) + private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { long context = 0; this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); try { // KeyIndexes do not need notification of in-place updates because the key does not change. - return !recordInfo.Tombstone && InPlaceUpdaterNoLock(ref key, ref input, ref value, ref recordInfo, address); + return !recordInfo.Tombstone && InPlaceUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address); } finally { @@ -1052,9 +1087,9 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp _clientSession.functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status, recordInfo); } - public void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { - _clientSession.functions.RMWCompletionCallback(ref key, ref input, ctx, status); + _clientSession.functions.RMWCompletionCallback(ref key, ref input, ref output, ctx, status); } public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, long address) diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 07ff55e4b..aaacabe08 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -483,16 +483,17 @@ public ValueTask.UpsertAsyncResult> /// /// /// + /// /// /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0) + public Status RMW(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) { if (SupportAsync) UnsafeResumeThread(); try { - return fht.ContextRMW(ref key, ref input, userContext, FasterSession, serialNo, ctx); + return fht.ContextRMW(ref key, ref input, ref output, userContext, FasterSession, serialNo, ctx); } finally { @@ -500,6 +501,37 @@ public Status RMW(ref Key key, ref Input input, Context userContext = default, l } } + /// + /// RMW operation + /// + /// + /// + /// + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Status RMW(Key key, Input input, out Output output, Context userContext = default, long serialNo = 0) + { + output = default; + return RMW(ref key, ref input, ref output, userContext, serialNo); + } + + /// + /// RMW operation + /// + /// + /// + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0) + { + Output output = default; + return RMW(ref key, ref input, ref output, userContext, serialNo); + } + /// /// RMW operation /// @@ -510,7 +542,10 @@ public Status RMW(ref Key key, ref Input input, Context userContext = default, l /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status RMW(Key key, Input input, Context userContext = default, long serialNo = 0) - => RMW(ref key, ref input, userContext, serialNo); + { + Output output = default; + return RMW(ref key, ref input, ref output, userContext, serialNo); + } /// /// Async RMW operation @@ -997,12 +1032,12 @@ private void ConcurrentDeleterLock(ref Key key, ref Value value, ref RecordInfo } } - public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) - => _clientSession.functions.NeedCopyUpdate(ref key, ref input, ref oldValue); + public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) + => _clientSession.functions.NeedCopyUpdate(ref key, ref input, ref oldValue, ref output); - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { - _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue); + _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); } public void DeleteCompletionCallback(ref Key key, Context ctx) @@ -1020,31 +1055,31 @@ public int GetLength(ref Value t, ref Input input) return _clientSession.variableLengthStruct.GetLength(ref t, ref input); } - public void InitialUpdater(ref Key key, ref Input input, ref Value value) + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { - _clientSession.functions.InitialUpdater(ref key, ref input, ref value); + _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) + public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => !this.SupportsLocking - ? InPlaceUpdaterNoLock(ref key, ref input, ref value, ref recordInfo, address) - : InPlaceUpdaterLock(ref key, ref input, ref value, ref recordInfo, address); + ? InPlaceUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address) + : InPlaceUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address); [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) + private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; - return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value); + return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref output); } - private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) + private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { long context = 0; this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); try { - return !recordInfo.Tombstone && InPlaceUpdaterNoLock(ref key, ref input, ref value, ref recordInfo, address); + return !recordInfo.Tombstone && InPlaceUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address); } finally { @@ -1057,9 +1092,9 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp _clientSession.functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status); } - public void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { - _clientSession.functions.RMWCompletionCallback(ref key, ref input, ctx, status); + _clientSession.functions.RMWCompletionCallback(ref key, ref input, ref output, ctx, status); } public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, long address) diff --git a/cs/src/core/Index/Common/CompletedOutput.cs b/cs/src/core/Index/Common/CompletedOutput.cs index 9ab8f7fa9..107853595 100644 --- a/cs/src/core/Index/Common/CompletedOutput.cs +++ b/cs/src/core/Index/Common/CompletedOutput.cs @@ -17,7 +17,7 @@ namespace FASTER.core /// The session holds this list and returns an enumeration to the caller of an appropriate CompletePending overload. The session will handle /// disposing and clearing this list, but it is best if the caller calls Dispose() after processing the results, so the key, input, and heap containers /// are released as soon as possible. - public class CompletedOutputIterator : IDisposable + public sealed class CompletedOutputIterator : IDisposable { internal const int kInitialAlloc = 32; internal const int kReallocMultuple = 2; @@ -133,13 +133,11 @@ internal void Dispose() { var tempKeyContainer = keyContainer; keyContainer = default; - if (tempKeyContainer is not null) - tempKeyContainer.Dispose(); + tempKeyContainer?.Dispose(); var tempInputContainer = inputContainer; inputContainer = default; - if (tempInputContainer is not null) - tempInputContainer.Dispose(); + tempInputContainer?.Dispose(); Output = default; Context = default; diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index c9cef24e9..a97f971e0 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -593,7 +593,7 @@ internal Status ContextUpsert(ref Key key } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status ContextRMW(ref Key key, ref Input input, Context context, FasterSession fasterSession, long serialNo, + internal Status ContextRMW(ref Key key, ref Input input, ref Output output, Context context, FasterSession fasterSession, long serialNo, FasterExecutionContext sessionCtx) where FasterSession : IFasterSession { @@ -601,7 +601,7 @@ internal Status ContextRMW(ref Key key, r OperationStatus internalStatus; do - internalStatus = InternalRMW(ref key, ref input, ref context, ref pcontext, fasterSession, sessionCtx, serialNo); + internalStatus = InternalRMW(ref key, ref input, ref output, ref context, ref pcontext, fasterSession, sessionCtx, serialNo); while (internalStatus == OperationStatus.RETRY_NOW); Status status; diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index af6ad2c37..fe7e0271c 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -248,8 +248,8 @@ internal OperationStatus InternalRead( if (!pendingContext.NoKey && pendingContext.key == default) // If this is true, we don't have a valid key pendingContext.key = hlog.GetKeyContainer(ref key); if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input); - pendingContext.output = output; + pendingContext.output = output; if (pendingContext.output is IHeapConvertible heapConvertible) heapConvertible.ConvertToHeap(); @@ -563,9 +563,9 @@ private OperationStatus CreateNewRecordUpsert /// Read-Modify-Write Operation. Updates value of 'key' using 'input' and current value. @@ -574,6 +574,7 @@ private OperationStatus CreateNewRecordUpsert /// key of the record. /// input used to update the value. + /// Location to store output computed from input and value. /// user context corresponding to operation used during completion callback. /// pending context created when the operation goes pending. /// Callback functions. @@ -605,7 +606,7 @@ private OperationStatus CreateNewRecordUpsert [MethodImpl(MethodImplOptions.AggressiveInlining)] internal OperationStatus InternalRMW( - ref Key key, ref Input input, + ref Key key, ref Input input, ref Output output, ref Context userContext, ref PendingContext pendingContext, FasterSession fasterSession, @@ -658,7 +659,7 @@ internal OperationStatus InternalRMW( { ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); if (!recordInfo.Tombstone - && fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress)) + && fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress)) { hlog.MarkPage(logicalAddress, sessionCtx.version); return OperationStatus.SUCCESS; @@ -689,7 +690,7 @@ internal OperationStatus InternalRMW( Debug.Assert(recordInfo.Version == sessionCtx.version); } - if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress)) + if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress)) { if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version); else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version); @@ -752,7 +753,7 @@ internal OperationStatus InternalRMW( CreateNewRecord: if (latchDestination != LatchDestination.CreatePendingContext) { - status = CreateNewRecordRMW(ref key, ref input, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, tag, entry, latestLogicalAddress); + status = CreateNewRecordRMW(ref key, ref input, ref output, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, tag, entry, latestLogicalAddress); if (status != OperationStatus.ALLOCATE_FAILED) goto LatchRelease; latchDestination = LatchDestination.CreatePendingContext; @@ -765,6 +766,11 @@ internal OperationStatus InternalRMW( pendingContext.type = OperationType.RMW; if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key); if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input); + + pendingContext.output = output; + if (pendingContext.output is IHeapConvertible heapConvertible) + heapConvertible.ConvertToHeap(); + pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; @@ -871,14 +877,14 @@ private LatchDestination AcquireLatchRMW(PendingContext< return LatchDestination.NormalProcessing; } - private OperationStatus CreateNewRecordRMW(ref Key key, ref Input input, ref PendingContext pendingContext, FasterSession fasterSession, + private OperationStatus CreateNewRecordRMW(ref Key key, ref Input input, ref Output output, ref PendingContext pendingContext, FasterSession fasterSession, FasterExecutionContext sessionCtx, HashBucket* bucket, int slot, long logicalAddress, long physicalAddress, ushort tag, HashBucketEntry entry, long latestLogicalAddress) where FasterSession : IFasterSession { if (logicalAddress >= hlog.HeadAddress && !hlog.GetInfo(physicalAddress).Tombstone) { - if (!fasterSession.NeedCopyUpdate(ref key, ref input, ref hlog.GetValue(physicalAddress))) + if (!fasterSession.NeedCopyUpdate(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output)) return OperationStatus.SUCCESS; } @@ -897,21 +903,21 @@ private OperationStatus CreateNewRecordRMW= hlog.HeadAddress) { if (hlog.GetInfo(physicalAddress).Tombstone) { - fasterSession.InitialUpdater(ref key, ref input, ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize)); + fasterSession.InitialUpdater(ref key, ref input, ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output); status = OperationStatus.NOTFOUND; } else { - fasterSession.CopyUpdater(ref key, ref input, - ref hlog.GetValue(physicalAddress), - ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize)); + fasterSession.CopyUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), + ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), + ref output); status = OperationStatus.SUCCESS; } } @@ -1539,7 +1545,7 @@ internal OperationStatus InternalContinuePendingRMW= hlog.BeginAddress) && !hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone) { - if (!fasterSession.NeedCopyUpdate(ref key, ref pendingContext.input.Get(), ref hlog.GetContextRecordValue(ref request))) + if (!fasterSession.NeedCopyUpdate(ref key, ref pendingContext.input.Get(), ref hlog.GetContextRecordValue(ref request), ref pendingContext.output)) { return OperationStatus.SUCCESS; } @@ -1566,16 +1572,16 @@ internal OperationStatus InternalContinuePendingRMW @@ -333,12 +334,12 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref Reco public void ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) { } - public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) - => _fasterKV._functions.NeedCopyUpdate(ref key, ref input, ref oldValue); + public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) + => _fasterKV._functions.NeedCopyUpdate(ref key, ref input, ref oldValue, ref output); - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { - _fasterKV._functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue); + _fasterKV._functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); } public void DeleteCompletionCallback(ref Key key, Context ctx) @@ -356,14 +357,14 @@ public int GetLength(ref Value t, ref Input input) return _fasterKV._variableLengthStructForInput.GetLength(ref t, ref input); } - public void InitialUpdater(ref Key key, ref Input input, ref Value value) + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { - _fasterKV._functions.InitialUpdater(ref key, ref input, ref value); + _fasterKV._functions.InitialUpdater(ref key, ref input, ref value, ref output); } - public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) + public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) { - return _fasterKV._functions.InPlaceUpdater(ref key, ref input, ref value); + return _fasterKV._functions.InPlaceUpdater(ref key, ref input, ref value, ref output); } public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordInfo recordInfo) @@ -371,9 +372,9 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp _fasterKV._functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status); } - public void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { - _fasterKV._functions.RMWCompletionCallback(ref key, ref input, ctx, status); + _fasterKV._functions.RMWCompletionCallback(ref key, ref input, ref output, ctx, status); } public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, long address) diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index a691c9587..e4afe021a 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -213,7 +213,7 @@ internal void InternalCompleteRetryRequest public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) => true; - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) { } + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { } public void DeleteCompletionCallback(ref Key key, Context ctx) { } - public void InitialUpdater(ref Key key, ref Input input, ref Value value) { } + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { } - public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) => true; + public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true; - public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) => true; + public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true; public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { } - public void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) { } + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { } /// /// No reads during compaction diff --git a/cs/src/core/Index/Interfaces/FunctionsBase.cs b/cs/src/core/Index/Interfaces/FunctionsBase.cs index f552ae8cd..6ceabb3d9 100644 --- a/cs/src/core/Index/Interfaces/FunctionsBase.cs +++ b/cs/src/core/Index/Interfaces/FunctionsBase.cs @@ -32,18 +32,18 @@ public virtual void SingleReader(ref Key key, ref Input input, ref Value value, public virtual void SingleWriter(ref Key key, ref Value src, ref Value dst) => dst = src; /// - public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value) { } + public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { } /// - public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) => true; + public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true; /// - public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) { } + public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { } /// - public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) => true; + public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true; /// public virtual void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { } /// - public virtual void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) { } + public virtual void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { } /// public virtual void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { } /// @@ -93,16 +93,16 @@ public SimpleFunctions(bool locking = false) : base(locking) { } public override void SingleWriter(ref Key key, ref Value src, ref Value dst) => dst = src; /// - public override void InitialUpdater(ref Key key, ref Value input, ref Value value) => value = input; + public override void InitialUpdater(ref Key key, ref Value input, ref Value value, ref Value output) => value = input; /// - public override void CopyUpdater(ref Key key, ref Value input, ref Value oldValue, ref Value newValue) => newValue = merger(input, oldValue); + public override void CopyUpdater(ref Key key, ref Value input, ref Value oldValue, ref Value newValue, ref Value output) => newValue = merger(input, oldValue); /// - public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value) { value = merger(input, value); return true; } + public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output) { value = merger(input, value); return true; } /// public override void ReadCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status) { } /// - public override void RMWCompletionCallback(ref Key key, ref Value input, Context ctx, Status status) { } + public override void RMWCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status) { } /// public override void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { } /// @@ -142,13 +142,13 @@ public virtual void SingleReader(ref Key key, ref Input input, ref Value value, public virtual void SingleWriter(ref Key key, ref Value src, ref Value dst) => dst = src; /// - public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value) { } + public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { } /// - public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) => true; + public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true; /// - public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) { } + public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { } /// - public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address) => true; + public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true; /// public virtual void ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) { } @@ -156,7 +156,7 @@ public virtual void ConcurrentDeleter(ref Key key, ref Value value, ref RecordIn /// public virtual void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordInfo recordInfo) { } /// - public virtual void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) { } + public virtual void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { } /// public virtual void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { } /// @@ -206,16 +206,16 @@ public AdvancedSimpleFunctions(bool locking = false) : base(locking) { } public override void SingleWriter(ref Key key, ref Value src, ref Value dst) => dst = src; /// - public override void InitialUpdater(ref Key key, ref Value input, ref Value value) => value = input; + public override void InitialUpdater(ref Key key, ref Value input, ref Value value, ref Value output) => value = input; /// - public override void CopyUpdater(ref Key key, ref Value input, ref Value oldValue, ref Value newValue) => newValue = merger(input, oldValue); + public override void CopyUpdater(ref Key key, ref Value input, ref Value oldValue, ref Value newValue, ref Value output) => newValue = merger(input, oldValue); /// - public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref RecordInfo recordInfo, long address) { value = merger(input, value); return true; } + public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output, ref RecordInfo recordInfo, long address) { value = merger(input, value); return true; } /// public override void ReadCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status, RecordInfo recordInfo) { } /// - public override void RMWCompletionCallback(ref Key key, ref Value input, Context ctx, Status status) { } + public override void RMWCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status) { } /// public override void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { } /// diff --git a/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs b/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs index e590637f6..50c702786 100644 --- a/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs +++ b/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs @@ -37,9 +37,10 @@ public interface IAdvancedFunctions /// /// The key for this record /// The user input that was used to perform the modification + /// The result of the RMW operation; if this is a struct, then it will be a temporary and should be copied to /// The application context passed through the pending operation /// The result of the pending operation - void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status); + void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status); /// /// Delete completion @@ -61,7 +62,8 @@ public interface IAdvancedFunctions /// The key for this record /// The user input to be used for computing the updated /// The destination to be updated; because this is an insert, there is no previous value there. - void InitialUpdater(ref Key key, ref Input input, ref Value value); + /// The location where the result of the operation on is to be copied + void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output); /// /// Whether we need to invoke copy-update for RMW @@ -69,7 +71,8 @@ public interface IAdvancedFunctions /// The key for this record /// The user input to be used for computing the updated value /// The existing value that would be copied. - bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) + /// The location where the result of the operation on is to be copied + bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) #if NETSTANDARD2_1 || NET => true #endif @@ -82,7 +85,8 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) /// The user input to be used for computing from /// The previous value to be copied/updated /// The destination to be updated; because this is an copy to a new location, there is no previous value there. - void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue); + /// The location where is to be copied + void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output); /// /// In-place update for RMW @@ -90,9 +94,10 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) /// The key for this record /// The user input to be used for computing the updated /// The destination to be updated; because this is an in-place update, there is a previous value there. + /// The location where the result of the operation on is to be copied /// A reference to the header of the record; may be used by /// The logical address of the record being updated; used as a RecordId by indexing - bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address); + bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address); /// /// Non-concurrent reader. diff --git a/cs/src/core/Index/Interfaces/IFunctions.cs b/cs/src/core/Index/Interfaces/IFunctions.cs index a0abbb9d2..005f07b8c 100644 --- a/cs/src/core/Index/Interfaces/IFunctions.cs +++ b/cs/src/core/Index/Interfaces/IFunctions.cs @@ -36,9 +36,10 @@ public interface IFunctions /// /// The key for this record /// The user input that was used to perform the modification + /// The result of the RMW operation; if this is a struct, then it will be a temporary and should be copied to /// The application context passed through the pending operation /// The result of the pending operation - void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status); + void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status); /// /// Delete completion @@ -60,7 +61,8 @@ public interface IFunctions /// The key for this record /// The user input to be used for computing the updated /// The destination to be updated; because this is an insert, there is no previous value there. - void InitialUpdater(ref Key key, ref Input input, ref Value value); + /// The location where the result of the operation on is to be copied + void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output); /// /// Whether we need to invoke copy-update for RMW @@ -68,7 +70,8 @@ public interface IFunctions /// The key for this record /// The user input to be used for computing the updated value /// The existing value that would be copied. - bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) + /// The location where the result of the operation on is to be copied + bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) #if NETSTANDARD2_1 || NET => true #endif @@ -81,7 +84,8 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) /// The user input to be used for computing from /// The previous value to be copied/updated /// The destination to be updated; because this is an copy to a new location, there is no previous value there. - void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue); + /// The location where is to be copied + void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output); /// /// In-place update for RMW @@ -89,7 +93,8 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue) /// The key for this record /// The user input to be used for computing the updated /// The destination to be updated; because this is an in-place update, there is a previous value there. - bool InPlaceUpdater(ref Key key, ref Input input, ref Value value); + /// The location where the result of the operation on is to be copied + bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output); /// /// Non-concurrent reader. diff --git a/cs/src/core/Index/Interfaces/TryAddFunctions.cs b/cs/src/core/Index/Interfaces/TryAddFunctions.cs index 6f454b9e6..3d69f3d6f 100644 --- a/cs/src/core/Index/Interfaces/TryAddFunctions.cs +++ b/cs/src/core/Index/Interfaces/TryAddFunctions.cs @@ -14,9 +14,9 @@ namespace FASTER.core public class TryAddFunctions : SimpleFunctions { /// - public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value) => true; + public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output) => true; /// - public override bool NeedCopyUpdate(ref Key key, ref Value input, ref Value oldValue) => false; + public override bool NeedCopyUpdate(ref Key key, ref Value input, ref Value oldValue, ref Value output) => false; } /// diff --git a/cs/src/core/VarLen/MemoryFunctions.cs b/cs/src/core/VarLen/MemoryFunctions.cs index d99a7bd4c..4b3c593cd 100644 --- a/cs/src/core/VarLen/MemoryFunctions.cs +++ b/cs/src/core/VarLen/MemoryFunctions.cs @@ -66,19 +66,19 @@ public override void ConcurrentReader(ref Key key, ref Memory input, ref Memo } /// - public override void InitialUpdater(ref Key key, ref Memory input, ref Memory value) + public override void InitialUpdater(ref Key key, ref Memory input, ref Memory value, ref (IMemoryOwner, int) output) { input.CopyTo(value); } /// - public override void CopyUpdater(ref Key key, ref Memory input, ref Memory oldValue, ref Memory newValue) + public override void CopyUpdater(ref Key key, ref Memory input, ref Memory oldValue, ref Memory newValue, ref (IMemoryOwner, int) output) { oldValue.CopyTo(newValue); } /// - public override bool InPlaceUpdater(ref Key key, ref Memory input, ref Memory value) + public override bool InPlaceUpdater(ref Key key, ref Memory input, ref Memory value, ref (IMemoryOwner, int) output) { // The default implementation of IPU simply writes input to destination, if there is space return ConcurrentWriter(ref key, ref input, ref value); diff --git a/cs/src/core/VarLen/SpanByteFunctions.cs b/cs/src/core/VarLen/SpanByteFunctions.cs index bb211d1ee..704554ba8 100644 --- a/cs/src/core/VarLen/SpanByteFunctions.cs +++ b/cs/src/core/VarLen/SpanByteFunctions.cs @@ -49,19 +49,19 @@ public override bool ConcurrentWriter(ref Key key, ref SpanByte src, ref SpanByt } /// - public override void InitialUpdater(ref Key key, ref SpanByte input, ref SpanByte value) + public override void InitialUpdater(ref Key key, ref SpanByte input, ref SpanByte value, ref Output output) { input.CopyTo(ref value); } /// - public override void CopyUpdater(ref Key key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue) + public override void CopyUpdater(ref Key key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref Output output) { oldValue.CopyTo(ref newValue); } /// - public override bool InPlaceUpdater(ref Key key, ref SpanByte input, ref SpanByte value) + public override bool InPlaceUpdater(ref Key key, ref SpanByte input, ref SpanByte value, ref Output output) { // The default implementation of IPU simply writes input to destination, if there is space return ConcurrentWriter(ref key, ref input, ref value); diff --git a/cs/test/AsyncTests.cs b/cs/test/AsyncTests.cs index 63f8df48d..8244bfc71 100644 --- a/cs/test/AsyncTests.cs +++ b/cs/test/AsyncTests.cs @@ -123,17 +123,17 @@ public override void ReadCompletionCallback(ref AdId key, ref AdInput input, ref public override void ConcurrentReader(ref AdId key, ref AdInput input, ref NumClicks value, ref Output dst) => dst.value = value; // RMW functions - public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value) => value = input.numClicks; + public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) => value = input.numClicks; - public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value) + public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) { Interlocked.Add(ref value.numClicks, input.numClicks.numClicks); return true; } - public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue) => true; + public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref Output output) => true; - public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue) + public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output) { newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks; } diff --git a/cs/test/BasicFASTERTests.cs b/cs/test/BasicFASTERTests.cs index 13fe45228..b9cd3f12c 100644 --- a/cs/test/BasicFASTERTests.cs +++ b/cs/test/BasicFASTERTests.cs @@ -275,6 +275,7 @@ public unsafe void TestShiftHeadAddress() public unsafe void NativeInMemRMWRefKeys() { InputStruct input = default; + OutputStruct output = default; var nums = Enumerable.Range(0, 1000).ToArray(); var rnd = new Random(11); @@ -298,10 +299,17 @@ public unsafe void NativeInMemRMWRefKeys() var i = nums[j]; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; input = new InputStruct { ifield1 = i, ifield2 = i + 1 }; - session.RMW(ref key1, ref input, Empty.Default, 0); + if (session.RMW(ref key1, ref input, ref output, Empty.Default, 0) == Status.PENDING) + { + session.CompletePending(true); + } + else + { + Assert.AreEqual(2 * i, output.value.vfield1); + Assert.AreEqual(2 * (i + 1), output.value.vfield2); + } } - OutputStruct output = default; Status status; KeyStruct key; diff --git a/cs/test/CompletePendingTests.cs b/cs/test/CompletePendingTests.cs index 7d4afa0ff..5d3b6ffcd 100644 --- a/cs/test/CompletePendingTests.cs +++ b/cs/test/CompletePendingTests.cs @@ -25,9 +25,10 @@ public void Setup() [TearDown] public void TearDown() { - fht.Dispose(); + fht?.Dispose(); fht = null; - log.Dispose(); + log?.Dispose(); + log = null; } const int numRecords = 1000; @@ -38,15 +39,16 @@ public void TearDown() static InputStruct NewInputStruct(int key) => new InputStruct { ifield1 = key + numRecords * 30, ifield2 = key + numRecords * 40 }; static ContextStruct NewContextStruct(int key) => new ContextStruct { cfield1 = key + numRecords * 50, cfield2 = key + numRecords * 60 }; - static void VerifyStructs(int key, ref KeyStruct keyStruct, ref InputStruct inputStruct, ref OutputStruct outputStruct, ref ContextStruct contextStruct) + static void VerifyStructs(int key, ref KeyStruct keyStruct, ref InputStruct inputStruct, ref OutputStruct outputStruct, ref ContextStruct contextStruct, bool useRMW) { Assert.AreEqual(key, keyStruct.kfield1); Assert.AreEqual(key + numRecords * 10, keyStruct.kfield2); Assert.AreEqual(key + numRecords * 30, inputStruct.ifield1); Assert.AreEqual(key + numRecords * 40, inputStruct.ifield2); - Assert.AreEqual(key, outputStruct.value.vfield1); - Assert.AreEqual(key + numRecords * 10, outputStruct.value.vfield2); + // RMW causes the InPlaceUpdater to be called, which adds input fields to the value. + Assert.AreEqual(key + (useRMW ? inputStruct.ifield1 : 0), outputStruct.value.vfield1); + Assert.AreEqual(key + numRecords * 10 + (useRMW ? inputStruct.ifield2 : 0), outputStruct.value.vfield2); Assert.AreEqual(key + numRecords * 50, contextStruct.cfield1); Assert.AreEqual(key + numRecords * 60, contextStruct.cfield2); } @@ -77,7 +79,7 @@ internal bool DeferPending() return false; } - internal void Process(CompletedOutputIterator completedOutputs) + internal void Process(CompletedOutputIterator completedOutputs, bool useRMW) { Assert.AreEqual(CompletedOutputIterator.kInitialAlloc * CompletedOutputIterator.kReallocMultuple, completedOutputs.vector.Length); @@ -88,7 +90,7 @@ internal void Process(CompletedOutputIterator()).NewSession>(); Assert.IsNull(session.completedOutputs); // Do not instantiate until we need it @@ -158,8 +160,10 @@ public async ValueTask ReadAndCompleteWithPendingOutput([Values]bool isAsync) } } - // We don't use input or context, but we test that they were carried through correctly. - var status = session.Read(ref keyStruct, ref inputStruct, ref outputStruct, contextStruct); + // We don't use context (though we verify it), and Read does not use input. + var status = useRMW + ? session.RMW(ref keyStruct, ref inputStruct, ref outputStruct, contextStruct) + : session.Read(ref keyStruct, ref inputStruct, ref outputStruct, contextStruct); if (status == Status.PENDING) { if (processPending.IsFirst()) @@ -175,18 +179,18 @@ public async ValueTask ReadAndCompleteWithPendingOutput([Values]bool isAsync) completedOutputs = await session.CompletePendingWithOutputsAsync(); else session.CompletePendingWithOutputs(out completedOutputs, wait: true); - processPending.Process(completedOutputs); + processPending.Process(completedOutputs, useRMW); } continue; } - Assert.IsTrue(status == Status.OK); + Assert.AreEqual(Status.OK, status); } processPending.VerifyNoDeferredPending(); } [Test] [Category("FasterKV")] - public async ValueTask AdvReadAndCompleteWithPendingOutput([Values]bool isAsync) + public async ValueTask AdvReadAndCompleteWithPendingOutput([Values] bool useRMW, [Values]bool isAsync) { using var session = fht.For(new AdvancedFunctionsWithContext()).NewSession>(); Assert.IsNull(session.completedOutputs); // Do not instantiate until we need it @@ -226,8 +230,10 @@ public async ValueTask AdvReadAndCompleteWithPendingOutput([Values]bool isAsync) } } - // We don't use input or context, but we test that they were carried through correctly. - var status = session.Read(ref keyStruct, ref inputStruct, ref outputStruct, contextStruct); + // We don't use context (though we verify it), and Read does not use input. + var status = useRMW + ? session.RMW(ref keyStruct, ref inputStruct, ref outputStruct, contextStruct) + : session.Read(ref keyStruct, ref inputStruct, ref outputStruct, contextStruct); if (status == Status.PENDING) { if (processPending.IsFirst()) @@ -243,11 +249,11 @@ public async ValueTask AdvReadAndCompleteWithPendingOutput([Values]bool isAsync) completedOutputs = await session.CompletePendingWithOutputsAsync(); else session.CompletePendingWithOutputs(out completedOutputs, wait: true); - processPending.Process(completedOutputs); + processPending.Process(completedOutputs, useRMW); } continue; } - Assert.IsTrue(status == Status.OK); + Assert.AreEqual(Status.OK, status); } processPending.VerifyNoDeferredPending(); } diff --git a/cs/test/FunctionPerSessionTests.cs b/cs/test/FunctionPerSessionTests.cs index 0ba546ac1..d89695caf 100644 --- a/cs/test/FunctionPerSessionTests.cs +++ b/cs/test/FunctionPerSessionTests.cs @@ -21,7 +21,7 @@ public class RefCountedAdder : FunctionsBase Increment(ref dst); - public override bool InPlaceUpdater(ref int key, ref int input, ref int value, ref RecordInfo recordInfo, long address) => Increment(ref value); + public override bool InPlaceUpdater(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) => Increment(ref value); public override bool SupportsLocking => true; public override void Lock(ref RecordInfo recordInfo, ref int key, ref int value, LockType lockType, ref long lockContext) => recordInfo.SpinLock(); diff --git a/cs/test/MemoryLogCompactionTests.cs b/cs/test/MemoryLogCompactionTests.cs index 55215f9d8..50b69edc9 100644 --- a/cs/test/MemoryLogCompactionTests.cs +++ b/cs/test/MemoryLogCompactionTests.cs @@ -118,7 +118,7 @@ public void MemoryLogCompactionTest1() public class MemoryCompaction : MemoryFunctions, int, int> { - public override void RMWCompletionCallback(ref ReadOnlyMemory key, ref Memory input, int ctx, Status status) + public override void RMWCompletionCallback(ref ReadOnlyMemory key, ref Memory input, ref (IMemoryOwner, int) output, int ctx, Status status) { Assert.IsTrue(status == Status.OK); } diff --git a/cs/test/NativeReadCacheTests.cs b/cs/test/NativeReadCacheTests.cs index d55e4a72e..884d8b73a 100644 --- a/cs/test/NativeReadCacheTests.cs +++ b/cs/test/NativeReadCacheTests.cs @@ -108,7 +108,6 @@ public void NativeDiskWriteReadCache() Assert.IsTrue(output.value.vfield1 == value.vfield1); Assert.IsTrue(output.value.vfield2 == value.vfield2); } - // Upsert to overwrite the read cache for (int i = 1900; i < 1950; i++) @@ -121,11 +120,19 @@ public void NativeDiskWriteReadCache() // RMW to overwrite the read cache for (int i = 1950; i < 2000; i++) { + OutputStruct output = default; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; input = new InputStruct { ifield1 = 1, ifield2 = 1 }; - var status = session.RMW(ref key1, ref input, Empty.Default, 0); + var status = session.RMW(ref key1, ref input, ref output, Empty.Default, 0); if (status == Status.PENDING) + { session.CompletePending(true); + } + else + { + Assert.IsTrue(output.value.vfield1 == i + 1); + Assert.IsTrue(output.value.vfield2 == i + 2); + } } // Read 100 keys @@ -140,7 +147,6 @@ public void NativeDiskWriteReadCache() Assert.IsTrue(output.value.vfield1 == value.vfield1); Assert.IsTrue(output.value.vfield2 == value.vfield2); } - } [Test] diff --git a/cs/test/NeedCopyUpdateTests.cs b/cs/test/NeedCopyUpdateTests.cs index ccf388756..8ca61eff8 100644 --- a/cs/test/NeedCopyUpdateTests.cs +++ b/cs/test/NeedCopyUpdateTests.cs @@ -111,18 +111,18 @@ public override void Deserialize(out RMWValue value) internal class TryAddTestFunctions : TryAddFunctions { - public override void InitialUpdater(ref int key, ref RMWValue input, ref RMWValue value) + public override void InitialUpdater(ref int key, ref RMWValue input, ref RMWValue value, ref RMWValue output) { input.flag = true; - base.InitialUpdater(ref key, ref input, ref value); + base.InitialUpdater(ref key, ref input, ref value, ref output); } - public override void CopyUpdater(ref int key, ref RMWValue input, ref RMWValue oldValue, ref RMWValue newValue) + public override void CopyUpdater(ref int key, ref RMWValue input, ref RMWValue oldValue, ref RMWValue newValue, ref RMWValue output) { Assert.Fail("CopyUpdater"); } - public override void RMWCompletionCallback(ref int key, ref RMWValue input, Status ctx, Status status) + public override void RMWCompletionCallback(ref int key, ref RMWValue input, ref RMWValue output, Status ctx, Status status) { Assert.IsTrue(status == ctx); diff --git a/cs/test/ObjectRecoveryTest2.cs b/cs/test/ObjectRecoveryTest2.cs index e2af46b73..3192d3508 100644 --- a/cs/test/ObjectRecoveryTest2.cs +++ b/cs/test/ObjectRecoveryTest2.cs @@ -227,10 +227,10 @@ internal void FinalizeRead(ref Status status, ref MyOutput g1) public class MyFunctions : FunctionsBase { - public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value) => value.value = input.value; - public override bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue) => true; - public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue) => newValue = oldValue; - public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value) + public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) => value.value = input.value; + public override bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyOutput output) => true; + public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output) => newValue = oldValue; + public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) { if (value.value.Length < input.value.Length) return false; diff --git a/cs/test/ObjectRecoveryTestTypes.cs b/cs/test/ObjectRecoveryTestTypes.cs index 642c87e80..9954d08a6 100644 --- a/cs/test/ObjectRecoveryTestTypes.cs +++ b/cs/test/ObjectRecoveryTestTypes.cs @@ -76,17 +76,17 @@ public class Functions : FunctionsBase public override void ConcurrentReader(ref AdId key, ref Input input, ref NumClicks value, ref Output dst) => dst.value = value; // RMW functions - public override void InitialUpdater(ref AdId key, ref Input input, ref NumClicks value) => value = input.numClicks; + public override void InitialUpdater(ref AdId key, ref Input input, ref NumClicks value, ref Output output) => value = input.numClicks; - public override bool InPlaceUpdater(ref AdId key, ref Input input, ref NumClicks value) + public override bool InPlaceUpdater(ref AdId key, ref Input input, ref NumClicks value, ref Output output) { Interlocked.Add(ref value.numClicks, input.numClicks.numClicks); return true; } - public override bool NeedCopyUpdate(ref AdId key, ref Input input, ref NumClicks oldValue) => true; + public override bool NeedCopyUpdate(ref AdId key, ref Input input, ref NumClicks oldValue, ref Output output) => true; - public override void CopyUpdater(ref AdId key, ref Input input, ref NumClicks oldValue, ref NumClicks newValue) + public override void CopyUpdater(ref AdId key, ref Input input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output) { newValue = new NumClicks { numClicks = oldValue.numClicks + input.numClicks.numClicks }; } diff --git a/cs/test/ObjectTestTypes.cs b/cs/test/ObjectTestTypes.cs index 3e2ea2261..223a98ba1 100644 --- a/cs/test/ObjectTestTypes.cs +++ b/cs/test/ObjectTestTypes.cs @@ -77,20 +77,20 @@ public class MyOutput public class MyFunctions : FunctionsBase { - public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value) + public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) { value = new MyValue { value = input.value }; } - public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value) + public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) { value.value += input.value; return true; } - public override bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue) => true; + public override bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyOutput output) => true; - public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue) + public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output) { newValue = new MyValue { value = oldValue.value + input.value }; } @@ -115,7 +115,7 @@ public override void ReadCompletionCallback(ref MyKey key, ref MyInput input, re Assert.IsTrue(key.key == output.value.value); } - public override void RMWCompletionCallback(ref MyKey key, ref MyInput input, Empty ctx, Status status) + public override void RMWCompletionCallback(ref MyKey key, ref MyInput input, ref MyOutput output, Empty ctx, Status status) { Assert.IsTrue(status == Status.OK); } @@ -135,20 +135,20 @@ public override void SingleWriter(ref MyKey key, ref MyValue src, ref MyValue ds public class MyFunctionsDelete : FunctionsBase { - public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value) + public override void InitialUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) { value = new MyValue { value = input.value }; } - public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value) + public override bool InPlaceUpdater(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput output) { value.value += input.value; return true; } - public override bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue) => true; + public override bool NeedCopyUpdate(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyOutput output) => true; - public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue) + public override void CopyUpdater(ref MyKey key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output) { newValue = new MyValue { value = oldValue.value + input.value }; } @@ -179,7 +179,7 @@ public override void ReadCompletionCallback(ref MyKey key, ref MyInput input, re } } - public override void RMWCompletionCallback(ref MyKey key, ref MyInput input, int ctx, Status status) + public override void RMWCompletionCallback(ref MyKey key, ref MyInput input, ref MyOutput output, int ctx, Status status) { if (ctx == 0) Assert.IsTrue(status == Status.OK); @@ -203,20 +203,20 @@ public override void SingleWriter(ref MyKey key, ref MyValue src, ref MyValue ds public class MixedFunctions : FunctionsBase { - public override void InitialUpdater(ref int key, ref MyInput input, ref MyValue value) + public override void InitialUpdater(ref int key, ref MyInput input, ref MyValue value, ref MyOutput output) { value = new MyValue { value = input.value }; } - public override bool InPlaceUpdater(ref int key, ref MyInput input, ref MyValue value) + public override bool InPlaceUpdater(ref int key, ref MyInput input, ref MyValue value, ref MyOutput output) { value.value += input.value; return true; } - public override bool NeedCopyUpdate(ref int key, ref MyInput input, ref MyValue oldValue) => true; + public override bool NeedCopyUpdate(ref int key, ref MyInput input, ref MyValue oldValue, ref MyOutput output) => true; - public override void CopyUpdater(ref int key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue) + public override void CopyUpdater(ref int key, ref MyInput input, ref MyValue oldValue, ref MyValue newValue, ref MyOutput output) { newValue = new MyValue { value = oldValue.value + input.value }; } diff --git a/cs/test/ReadAddressTests.cs b/cs/test/ReadAddressTests.cs index 913f05e1e..30079549d 100644 --- a/cs/test/ReadAddressTests.cs +++ b/cs/test/ReadAddressTests.cs @@ -110,7 +110,7 @@ public override void ReadCompletionCallback(ref Key key, ref Value input, ref Va } } - public override void RMWCompletionCallback(ref Key key, ref Value input, Context ctx, Status status) + public override void RMWCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status) { if (ctx is not null) { @@ -118,7 +118,7 @@ public override void RMWCompletionCallback(ref Key key, ref Value input, Context ctx.recordInfo = default; ctx.status = status; } - base.RMWCompletionCallback(ref key, ref input, ctx, status); + base.RMWCompletionCallback(ref key, ref input, ref output, ctx, status); } } diff --git a/cs/test/RecoverContinueTests.cs b/cs/test/RecoverContinueTests.cs index 9f29c463e..ed3af26d9 100644 --- a/cs/test/RecoverContinueTests.cs +++ b/cs/test/RecoverContinueTests.cs @@ -168,20 +168,20 @@ public override void ReadCompletionCallback(ref AdId key, ref AdInput input, ref public override void ConcurrentReader(ref AdId key, ref AdInput input, ref NumClicks value, ref Output dst) => dst.value = value; // RMW functions - public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value) + public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) { value = input.numClicks; } - public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value) + public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) { Interlocked.Add(ref value.numClicks, input.numClicks.numClicks); return true; } - public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue) => true; + public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref Output output) => true; - public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue) + public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output) { newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks; } diff --git a/cs/test/RecoveryTestTypes.cs b/cs/test/RecoveryTestTypes.cs index 14f5e165c..2bbf26796 100644 --- a/cs/test/RecoveryTestTypes.cs +++ b/cs/test/RecoveryTestTypes.cs @@ -58,20 +58,20 @@ public override void ConcurrentReader(ref AdId key, ref AdInput input, ref NumCl } // RMW functions - public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value) + public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) { value = input.numClicks; } - public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value) + public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) { Interlocked.Add(ref value.numClicks, input.numClicks.numClicks); return true; } - public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue) => true; + public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref Output output) => true; - public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue) + public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output) { newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks; } diff --git a/cs/test/SimpleAsyncTests.cs b/cs/test/SimpleAsyncTests.cs index 76c0e6de4..16b4135c9 100644 --- a/cs/test/SimpleAsyncTests.cs +++ b/cs/test/SimpleAsyncTests.cs @@ -10,7 +10,6 @@ namespace FASTER.test.async { - [TestFixture] public class SimpleAsyncTests { @@ -124,7 +123,8 @@ public async Task ReadAsyncRefKeyRefInputTest() for (key = 0; key < numOps; key++) { (status, output) = (await s1.ReadAsync(ref key, ref output)).Complete(); - Assert.IsTrue(status == Status.OK && output == key); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key, output); } key = 0; @@ -136,7 +136,8 @@ public async Task ReadAsyncRefKeyRefInputTest() (await t2).Complete(); // should trigger RMW re-do (status, output) = (await s1.ReadAsync(ref key, ref output)).Complete(); - Assert.IsTrue(status == Status.OK && output == key + input + input); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key + input + input, output); } @@ -148,16 +149,19 @@ public async Task ReadAsyncNoRefKeyNoRefInputTest() Status status; long key = default, input = default, output = default; - using var s1 = fht1.NewSession(new SimpleFunctions((a, b) => a + b)); + using var s1 = fht1.NewSession(new RMWSimpleFunctions((a, b) => a + b)); for (key = 0; key < numOps; key++) { - (await s1.RMWAsync(ref key, ref key,Empty.Default)).Complete(); + (status, output) = (await s1.RMWAsync(ref key, ref key, Empty.Default)).Complete(); + Assert.AreNotEqual(Status.PENDING, status); + Assert.AreEqual(key, output); } for (key = 0; key < numOps; key++) { (status, output) = (await s1.ReadAsync(key, output)).Complete(); - Assert.IsTrue(status == Status.OK && output == key); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key, output); } key = 0; @@ -169,7 +173,8 @@ public async Task ReadAsyncNoRefKeyNoRefInputTest() (await t2).Complete(); // should trigger RMW re-do (status, output) = (await s1.ReadAsync(key, output,Empty.Default, 129)).Complete(); - Assert.IsTrue(status == Status.OK && output == key + input + input); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key + input + input, output); } // Test that does .UpsertAsync, .ReadAsync, .DeleteAsync, .ReadAsync with minimum parameters passed by reference (ref key) @@ -234,8 +239,6 @@ public async Task UpsertReadDeleteReadAsyncMinParamByValueTest() } } - /* ** TO DO: Using StartAddress in ReadAsync is now obsolete - might be design change etc but until then, commenting out test ** - * // Test that uses StartAddress parameter // (ref key, ref input, StartAddress, userContext, serialNo, CancellationToken) [Test] @@ -243,20 +246,29 @@ public async Task UpsertReadDeleteReadAsyncMinParamByValueTest() public async Task AsyncStartAddressParamTest() { Status status; - CancellationToken cancellationToken; long key = default, input = default, output = default; - var readAtAddress = fht1.Log.BeginAddress; - using var s1 = fht1.NewSession(new SimpleFunctions((a, b) => a + b)); + var addresses = new long[numOps]; + long recordSize = fht1.Log.FixedRecordSize; + + using var s1 = fht1.NewSession(new AdvancedRMWSimpleFunctions((a, b) => a + b)); for (key = 0; key < numOps; key++) { - (await s1.RMWAsync(ref key, ref key)).Complete(); + // We can predict the address as TailAddress because we're single-threaded, *unless* a page was allocated; + // in that case the new address is at the start of the newly-allocated page. Since we can't predict that, + // we take advantage of knowing we have fixed-length records and that TailAddress is open-ended, so we + // subtract after the insert to get record start address. + (status, output) = (await s1.RMWAsync(ref key, ref key)).Complete(); + addresses[key] = fht1.Log.TailAddress - recordSize; + Assert.AreNotEqual(Status.PENDING, status); + Assert.AreEqual(key, output); } for (key = 0; key < numOps; key++) { - (status, output) = (await s1.ReadAsync(ref key, ref output, readAtAddress, ReadFlags.None)).Complete(); - Assert.IsTrue(status == Status.OK && output == key); + (status, output) = (await s1.ReadAsync(ref key, ref output, addresses[key], ReadFlags.None)).Complete(); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key, output); } key = 0; @@ -267,10 +279,14 @@ public async Task AsyncStartAddressParamTest() (await t1).Complete(); (await t2).Complete(); // should trigger RMW re-do - (status, output) = (await s1.ReadAsync(ref key, ref output, readAtAddress, ReadFlags.None, Empty.Default, 129, cancellationToken)).Complete(); - Assert.IsTrue(status == Status.OK && output == key + input + input); + // Because of our small log-memory size, RMW of key 0 causes an RCW (Read-Copy-Write) and an insertion at the tail + // of the log. Use the same pattern as above to get the new record address. + addresses[key] = fht1.Log.TailAddress - recordSize; + + (status, output) = (await s1.ReadAsync(ref key, ref output, addresses[key], ReadFlags.None, Empty.Default, 129)).Complete(); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key + input + input, output); } - */ // Test of RMWAsync where No ref used [Test] @@ -280,17 +296,19 @@ public async Task ReadAsyncRMWAsyncNoRefTest() Status status; long key = default, input = default, output = default; - using var s1 = fht1.NewSession(new SimpleFunctions((a, b) => a + b)); + using var s1 = fht1.NewSession(new RMWSimpleFunctions((a, b) => a + b)); for (key = 0; key < numOps; key++) { - status = (await s1.RMWAsync(key, key)).Complete(); - Assert.AreNotEqual(Status.PENDING, status); + var asyncResult = await (await s1.RMWAsync(key, key)).CompleteAsync(); + Assert.AreNotEqual(Status.PENDING, asyncResult.Status); + Assert.AreEqual(key, asyncResult.Output); } for (key = 0; key < numOps; key++) { (status, output) = (await s1.ReadAsync(ref key, ref output)).Complete(); - Assert.IsTrue(status == Status.OK && output == key); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key, output); } key = 0; @@ -302,7 +320,8 @@ public async Task ReadAsyncRMWAsyncNoRefTest() (await t2).Complete(); // should trigger RMW re-do (status, output) = (await s1.ReadAsync(ref key, ref output)).Complete(); - Assert.IsTrue(status == Status.OK && output == key + input + input); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key + input + input, output); } // Test of ReadyToCompletePendingAsync @@ -325,7 +344,8 @@ public async Task ReadyToCompletePendingAsyncTest() for (key = 0; key < numOps; key++) { (status, output) = (await s1.ReadAsync(ref key, ref output)).Complete(); - Assert.IsTrue(status == Status.OK && output == key); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key, output); } key = 0; @@ -337,7 +357,8 @@ public async Task ReadyToCompletePendingAsyncTest() (await t2).Complete(); // should trigger RMW re-do (status, output) = (await s1.ReadAsync(ref key, ref output)).Complete(); - Assert.IsTrue(status == Status.OK && output == key + input + input); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key + input + input, output); } // Test that does both UpsertAsync and RMWAsync to populate the FasterKV and update it, possibly after flushing it from memory. diff --git a/cs/test/SimpleRecoveryTest.cs b/cs/test/SimpleRecoveryTest.cs index de90af43b..7f7289107 100644 --- a/cs/test/SimpleRecoveryTest.cs +++ b/cs/test/SimpleRecoveryTest.cs @@ -278,20 +278,20 @@ public override void ReadCompletionCallback(ref AdId key, ref AdInput input, ref public override void ConcurrentReader(ref AdId key, ref AdInput input, ref NumClicks value, ref Output dst) => dst.value = value; // RMW functions - public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value) + public override void InitialUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) { value = input.numClicks; } - public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value) + public override bool InPlaceUpdater(ref AdId key, ref AdInput input, ref NumClicks value, ref Output output) { Interlocked.Add(ref value.numClicks, input.numClicks.numClicks); return true; } - public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue) => true; + public override bool NeedCopyUpdate(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref Output output) => true; - public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue) + public override void CopyUpdater(ref AdId key, ref AdInput input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output) { newValue.numClicks += oldValue.numClicks + input.numClicks.numClicks; } diff --git a/cs/test/TestTypes.cs b/cs/test/TestTypes.cs index 541e05d02..39e174a7a 100644 --- a/cs/test/TestTypes.cs +++ b/cs/test/TestTypes.cs @@ -2,15 +2,8 @@ // Licensed under the MIT license. using System; -using System.Text; using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Linq; using FASTER.core; -using System.Runtime.CompilerServices; -using System.IO; -using System.Diagnostics; using NUnit.Framework; namespace FASTER.test @@ -64,16 +57,18 @@ public class Functions : FunctionsWithContext public class FunctionsWithContext : FunctionsBase { - public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, TContext ctx, Status status) + public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, TContext ctx, Status status) { - Assert.IsTrue(status == Status.OK); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key.kfield1 + input.ifield1, output.value.vfield1); + Assert.AreEqual(key.kfield2 + input.ifield2, output.value.vfield2); } public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, TContext ctx, Status status) { - Assert.IsTrue(status == Status.OK); - Assert.IsTrue(output.value.vfield1 == key.kfield1); - Assert.IsTrue(output.value.vfield2 == key.kfield2); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key.kfield1, output.value.vfield1); + Assert.AreEqual(key.kfield2, output.value.vfield2); } // Read functions @@ -82,25 +77,28 @@ public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct i public override void ConcurrentReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst) => dst.value = value; // RMW functions - public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value) + public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output) { value.vfield1 = input.ifield1; value.vfield2 = input.ifield2; + output.value = value; } - public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value) + public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output) { value.vfield1 += input.ifield1; value.vfield2 += input.ifield2; + output.value = value; return true; } - public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue) => true; + public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref OutputStruct output) => true; - public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue) + public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue, ref OutputStruct output) { newValue.vfield1 = oldValue.vfield1 + input.ifield1; newValue.vfield2 = oldValue.vfield2 + input.ifield2; + output.value = newValue; } } @@ -110,16 +108,18 @@ public class AdvancedFunctions : AdvancedFunctionsWithContext public class AdvancedFunctionsWithContext : AdvancedFunctionsBase { - public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, TContext ctx, Status status) + public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, TContext ctx, Status status) { - Assert.IsTrue(status == Status.OK); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key.kfield1 + input.ifield1, output.value.vfield1); + Assert.AreEqual(key.kfield2 + input.ifield2, output.value.vfield2); } public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, TContext ctx, Status status, RecordInfo recordInfo) { - Assert.IsTrue(status == Status.OK); - Assert.IsTrue(output.value.vfield1 == key.kfield1); - Assert.IsTrue(output.value.vfield2 == key.kfield2); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key.kfield1, output.value.vfield1); + Assert.AreEqual(key.kfield2, output.value.vfield2); } // Read functions @@ -128,46 +128,49 @@ public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct i public override void ConcurrentReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst, ref RecordInfo recordInfo, long address) => dst.value = value; // RMW functions - public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value) + public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output) { value.vfield1 = input.ifield1; value.vfield2 = input.ifield2; + output.value = value; } - public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref RecordInfo recordInfo, long address) + public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output, ref RecordInfo recordInfo, long address) { value.vfield1 += input.ifield1; value.vfield2 += input.ifield2; + output.value = value; return true; } - public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue) => true; + public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref OutputStruct output) => true; - public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue) + public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue, ref OutputStruct output) { newValue.vfield1 = oldValue.vfield1 + input.ifield1; newValue.vfield2 = oldValue.vfield2 + input.ifield2; + output.value = newValue; } } public class FunctionsCompaction : FunctionsBase { - public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, int ctx, Status status) + public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, int ctx, Status status) { - Assert.IsTrue(status == Status.OK); + Assert.AreEqual(Status.OK, status); } public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, int ctx, Status status) { if (ctx == 0) { - Assert.IsTrue(status == Status.OK); - Assert.IsTrue(output.value.vfield1 == key.kfield1); - Assert.IsTrue(output.value.vfield2 == key.kfield2); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key.kfield1, output.value.vfield1); + Assert.AreEqual(key.kfield2, output.value.vfield2); } else { - Assert.IsTrue(status == Status.NOTFOUND); + Assert.AreEqual(Status.NOTFOUND, status); } } @@ -177,22 +180,22 @@ public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct i public override void ConcurrentReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst) => dst.value = value; // RMW functions - public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value) + public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output) { value.vfield1 = input.ifield1; value.vfield2 = input.ifield2; } - public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value) + public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output) { value.vfield1 += input.ifield1; value.vfield2 += input.ifield2; return true; } - public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue) => true; + public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref OutputStruct output) => true; - public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue) + public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue, ref OutputStruct output) { newValue.vfield1 = oldValue.vfield1 + input.ifield1; newValue.vfield2 = oldValue.vfield2 + input.ifield2; @@ -207,16 +210,16 @@ public class FunctionsCopyOnWrite : FunctionsBase _concurrentWriterCallCount; public int InPlaceUpdaterCallCount => _inPlaceUpdaterCallCount; - public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, Empty ctx, Status status) + public override void RMWCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, Empty ctx, Status status) { - Assert.IsTrue(status == Status.OK); + Assert.AreEqual(Status.OK, status); } public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, Empty ctx, Status status) { - Assert.IsTrue(status == Status.OK); - Assert.IsTrue(output.value.vfield1 == key.kfield1); - Assert.IsTrue(output.value.vfield2 == key.kfield2); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key.kfield1, output.value.vfield1); + Assert.AreEqual(key.kfield2, output.value.vfield2); } // Read functions @@ -234,24 +237,76 @@ public override bool ConcurrentWriter(ref KeyStruct key, ref ValueStruct src, re } // RMW functions - public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value) + public override void InitialUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output) { value.vfield1 = input.ifield1; value.vfield2 = input.ifield2; } - public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value) + public override bool InPlaceUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct output) { Interlocked.Increment(ref _inPlaceUpdaterCallCount); return false; } - public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue) => true; + public override bool NeedCopyUpdate(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref OutputStruct output) => true; - public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue) + public override void CopyUpdater(ref KeyStruct key, ref InputStruct input, ref ValueStruct oldValue, ref ValueStruct newValue, ref OutputStruct output) { newValue.vfield1 = oldValue.vfield1 + input.ifield1; newValue.vfield2 = oldValue.vfield2 + input.ifield2; } } + + class RMWSimpleFunctions : SimpleFunctions + { + public RMWSimpleFunctions(Func merger) : base(merger) { } + + public override void InitialUpdater(ref Key key, ref Value input, ref Value value, ref Value output) + { + base.InitialUpdater(ref key, ref input, ref value, ref output); + output = input; + } + + /// + public override void CopyUpdater(ref Key key, ref Value input, ref Value oldValue, ref Value newValue, ref Value output) + { + base.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); + output = newValue; + } + + /// + public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output) + { + base.InPlaceUpdater(ref key, ref input, ref value, ref output); + output = value; + return true; + } + } + + class AdvancedRMWSimpleFunctions : AdvancedSimpleFunctions + { + public AdvancedRMWSimpleFunctions(Func merger) : base(merger) { } + + public override void InitialUpdater(ref Key key, ref Value input, ref Value value, ref Value output) + { + base.InitialUpdater(ref key, ref input, ref value, ref output); + output = input; + } + + /// + public override void CopyUpdater(ref Key key, ref Value input, ref Value oldValue, ref Value newValue, ref Value output) + { + base.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); + output = newValue; + } + + /// + public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output, ref RecordInfo recordInfo, long address) + { + base.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); + output = value; + return true; + } + } } diff --git a/cs/test/VLTestTypes.cs b/cs/test/VLTestTypes.cs index 6c0628381..c27e266f9 100644 --- a/cs/test/VLTestTypes.cs +++ b/cs/test/VLTestTypes.cs @@ -107,7 +107,7 @@ public struct Input public class VLFunctions : FunctionsBase { - public override void RMWCompletionCallback(ref Key key, ref Input input, Empty ctx, Status status) + public override void RMWCompletionCallback(ref Key key, ref Input input, ref int[] output, Empty ctx, Status status) { Assert.IsTrue(status == Status.OK); } @@ -150,7 +150,7 @@ public override bool ConcurrentWriter(ref Key key, ref VLValue src, ref VLValue public class VLFunctions2 : FunctionsBase { - public override void RMWCompletionCallback(ref VLValue key, ref Input input, Empty ctx, Status status) + public override void RMWCompletionCallback(ref VLValue key, ref Input input, ref int[] output, Empty ctx, Status status) { Assert.IsTrue(status == Status.OK); } diff --git a/docs/_docs/20-fasterkv-basics.md b/docs/_docs/20-fasterkv-basics.md index b46d39c6c..7602a5dc2 100644 --- a/docs/_docs/20-fasterkv-basics.md +++ b/docs/_docs/20-fasterkv-basics.md @@ -95,7 +95,7 @@ For session operations, the user provides an instance of a type that implements Apart from Key and Value, the IFunctions interface is defined on three additional types: 1. `Input`: This is the type of input provided to FASTER when calling Read or RMW. It may be regarded as a parameter for the Read or RMW operation. For example, with RMW, it may be the delta being accumulated into the value. -2. `Output`: This is the type of the output of a Read operation. The reader copies the relevant parts of the Value to Output. +2. `Output`: This is the type of the output of a Read or RMW operation. The reader or updater copies the relevant parts of the Value to Output. 3. `Context`: User-defined context for the operation. Use `Empty` if there is no context necesssary. `IFunctions<>` encapsulates all callbacks made by FASTER back to the caller, which are described next: @@ -103,7 +103,7 @@ Apart from Key and Value, the IFunctions interface is defined on three additiona 1. SingleReader and ConcurrentReader: These are used to read from the store values and copy them to Output. Single reader can assume that there are no concurrent operations on the record. 2. SingleWriter and ConcurrentWriter: These are used to write values to the store, from a source value. Single writer can assume that there are no concurrent operations on the record. 3. Completion callbacks: Called by FASTER when various operations complete after they have gone "pending" due to requiring IO. -4. RMW Updaters: There are three updaters that the user specifies, InitialUpdater, InPlaceUpdater, and CopyUpdater. Together, they are used to implement the RMW operation. +4. RMW Updaters: There are three updaters that the user specifies, InitialUpdater, InPlaceUpdater, and CopyUpdater. Together, they are used to implement the RMW operation and return the Output to the caller. There is also a NeedCopyUpdate() method that is called before appending a copied-and-updated record to the tail of the log; if it returns false, the record is not copied. 5. Locking: There is one property and two methods; if the SupportsLocking property returns true, then FASTER will call Lock and Unlock within a try/finally in the four concurrent callback methods: ConcurrentReader, ConcurrentWriter, ConcurrentDeleter (new in IAdvancedFunctions), and InPlaceUpdater. FunctionsBase illustrates the default implementation of Lock and Unlock as an exclusive lock using a bit in RecordInfo. FASTER also support an advanced callback functions API with more hooks. See @@ -123,7 +123,7 @@ An equivalent, but more optimized API requires you to specify the Functions type var session = store.For(new Functions()).NewSession(); ``` -You can then perform a sequence of read, upsert, and RMW operations on the session. FASTER supports synchronous versions of all operations, as well as async versions. While all methods exist in an async form, only read and RMW are generally expected to go async; upserts and deletes will only go async when it is necessary to wait on flush operations when appending records to the log. The basic forms of these operations are described below; additional overloads are available. +You can then perform a sequence of read, upsert, and RMW operations on the session. FASTER supports both synchronous and async versions of all operations. While all methods exist in an async form, only read and RMW are generally expected to go async; upserts and deletes will only go async when it is necessary to wait on flush operations when appending records to the log. The basic forms of these operations are described below; additional overloads are available. #### Read @@ -157,15 +157,17 @@ while (r.Status == Status.PENDING) ```cs // Sync var status = session.RMW(ref key, ref input); +var status = session.RMW(ref key, ref input, ref output); var status = session.RMW(ref key, ref input, context, serialNo); // Async with sync operation completion (completion may rarely go async) var status = (await session.RMWAsync(ref key, ref input)).Complete(); -// Fully async (completion may rarely go async) +// Fully async (completion may rarely go async and require multiple iterations) var r = await session.RMWAsync(ref key, ref input); while (r.Status == Status.PENDING) r = await r.CompleteAsync(); +Console.WriteLine(r.Output); ``` #### Delete @@ -277,8 +279,7 @@ public static void Test() s.Read(ref key, ref output); Debug.Assert(output == value); s.RMW(ref key, ref input); - s.RMW(ref key, ref input); - s.Read(ref key, ref output); + s.RMW(ref key, ref input, ref output); Debug.Assert(output == value + 20); } ``` diff --git a/docs/_docs/50-remote-basics.md b/docs/_docs/50-remote-basics.md index ed1e5784c..299976b8a 100644 --- a/docs/_docs/50-remote-basics.md +++ b/docs/_docs/50-remote-basics.md @@ -75,13 +75,13 @@ on `Input`, which can for example, have an enum operation ID inside `Input` for server should do during RMW, on `Value`: ```cs -void InitialUpdater(ref Key key, ref Input input, ref Value value) => value.value = input.value; -bool InPlaceUpdater(ref Key key, ref Input input, ref Value value) +void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => value.value = input.value; +bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) { Interlocked.Add(ref value.value, input.value); return true; } -public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) +public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) => newValue.value = input.value + oldValue.value; ``` @@ -173,8 +173,16 @@ session.Read(key: 23); session.CompletePending(true); ``` -The final read will produce a result that is `25+25` more than the older value of `10023`, for key `23`. +The final read will produce a result that is `25+25` more than the original value of `10023`, for key `23`. +We can also get the Output from RMW operations directly: + +```cs +session.RMW(23, 25); +session.CompletePending(true); +``` + +The RMW completion callback will verify a result that is `25+25+25` more than the original value of `10023`, for key `23`. #### Async Client API