Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Add Output to RMW #515

Merged
merged 6 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public struct Functions : IFunctions<Key, Value, Input, Output, Empty>

public Functions(bool locking) => this.locking = locking;

public void RMWCompletionCallback(ref Key key, ref Input input, Empty ctx, Status status)
public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status)
{
}

Expand Down Expand Up @@ -65,20 +65,20 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst)

// RMW functions
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void InitialUpdater(ref Key key, ref Input input, ref Value value)
public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output)
{
value.value = input.value;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value)
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output)
{
value.value += input.value;
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue)
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output)
{
newValue.value = input.value + oldValue.value;
}
Expand Down
21 changes: 16 additions & 5 deletions cs/benchmark/scripts/run_benchmark.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,21 @@
Number of seconds to run the experiment.
Used primarily to debug changes to this script or do a quick one-off run; the default is 30 seconds.

.PARAMETER NumThreads
.PARAMETER ThreadCount
Number of threads to use.
Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script.

.PARAMETER LockMode
Locking mode to use: 0 = No locking, 1 = RecordInfo locking
Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script.

.PARAMETER ReadPercentages
Keys the Operation to perform: An array of one or more of:
0 = No read (Upsert workload only)
100 = All reads
Between 0 and 100 = mix of reads and upserts
-1 = All RMWs

.PARAMETER UseRecover
Recover the FasterKV from a checkpoint of a previous run rather than loading it from data.
Used primarily to debug changes to this script or do a quick one-off run; the default is false.
Expand Down Expand Up @@ -63,9 +70,9 @@
Runs 3 directories.

.EXAMPLE
pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -CloneAndBuild <other args>"
pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -ReadPercentages -1 <other args>"

Clones the master branch to the .\master folder, the branch_with_my_changes to the branch_with_my_changes folder, and runs those with any <other args> specified.
Runs an RMW-only workload

.EXAMPLE
pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -CloneAndBuild <other args>"
Expand All @@ -76,9 +83,10 @@ param (
[Parameter(Mandatory=$true)] [string[]]$ExeDirs,
[Parameter(Mandatory=$false)] [int]$RunSeconds = 30,
[Parameter(Mandatory=$false)] [int]$ThreadCount = -1,
[Parameter(Mandatory=$false)] [int]$lockMode = -1,
[Parameter(Mandatory=$false)] [int]$LockMode = -1,
[Parameter(Mandatory=$false)] [int[]]$ReadPercentages,
[Parameter(Mandatory=$false)] [switch]$UseRecover,
[Parameter(Mandatory=$false)] [switch]$CloneAndBuild.
[Parameter(Mandatory=$false)] [switch]$CloneAndBuild,
[Parameter(Mandatory=$false)] [switch]$NetCore31
)

Expand Down Expand Up @@ -136,6 +144,9 @@ if ($ThreadCount -ge 0) {
if ($LockMode -ge 0) {
$lockModes = ($LockMode)
}
if ($ReadPercentages) {
$readPercents = $ReadPercentages
}
if ($UseRecover) {
$k = "-k"
}
Expand Down
6 changes: 3 additions & 3 deletions cs/playground/SumStore/SumStoreTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ public override void ConcurrentReader(ref AdId key, ref Input input, ref NumClic
}

// RMW functions
public override void InitialUpdater(ref AdId key, ref Input input, ref NumClicks value)
public override void InitialUpdater(ref AdId key, ref Input input, ref NumClicks value, ref Output output)
{
value = input.numClicks;
}

public override bool InPlaceUpdater(ref AdId key, ref Input input, ref NumClicks value)
public override bool InPlaceUpdater(ref AdId key, ref Input input, ref NumClicks value, ref Output output)
{
Interlocked.Add(ref value.numClicks, input.numClicks.numClicks);
return true;
}

public override void CopyUpdater(ref AdId key, ref Input input, ref NumClicks oldValue, ref NumClicks newValue)
public override void CopyUpdater(ref AdId key, ref Input input, ref NumClicks oldValue, ref NumClicks newValue, ref Output output)
{
newValue.numClicks = oldValue.numClicks + input.numClicks.numClicks;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/benchmark/FASTER.benchmark/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void ReadCompletionCallback(ref Key key, ref Input input, ref Output outp
{
}

public void RMWCompletionCallback(ref Key key, ref Input input, Empty ctx, Status status)
public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status)
{
}

Expand Down
20 changes: 16 additions & 4 deletions cs/remote/samples/FixedLenClient/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,30 @@ public override void ReadCompletionCallback(ref long key, ref long input, ref lo
{
if (ctx == 0)
{
if (status != Status.OK || key + 10000 != output)
throw new Exception("Incorrect read result");
var expected = key + 10000;
if (status != Status.OK || expected != output)
throw new Exception($"Incorrect read result for key {key}; expected = {expected}, actual = {output}");
}
else if (ctx == 1)
{
if (status != Status.OK || key + 10000 + 25 + 25 != output)
throw new Exception("Incorrect read result");
var expected = key + 10000 + 25 + 25;
if (status != Status.OK || expected != output)
throw new Exception($"Incorrect read result for key {key}; expected = {expected}, actual = {output}");
}
else
{
throw new Exception("Unexpected user context");
}
}

public override void RMWCompletionCallback(ref long key, ref long input, ref long output, byte ctx, Status status)
{
if (ctx == 1)
{
var expected = key + 10000 + 25 + 25 + 25;
if (status != Status.OK || expected != output)
throw new Exception($"Incorrect read result for key {key}; expected = {expected}, actual = {output}");
}
}
}
}
37 changes: 24 additions & 13 deletions cs/remote/samples/FixedLenClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ static void SyncSamples(ClientSession<long, long, long, long, byte, Functions, F
session.Read(23, userContext: 1);
session.CompletePending(true);

// Now we illustrate Output from RMW directly, again using userContext to control verification - see Functions.cs
session.RMW(23, 25, userContext: 1);
session.CompletePending(true);

for (int i = 100; i < 200; i++)
session.Upsert(i, i + 10000);

Expand Down Expand Up @@ -116,31 +120,38 @@ static async Task AsyncSamples(ClientSession<long, long, long, long, byte, Funct

await session.DeleteAsync(25);

(status, _) = await session.ReadAsync(25);
long key = 25;
(status, _) = await session.ReadAsync(key);
if (status != Status.NOTFOUND)
throw new Exception("Error!");
throw new Exception($"Error! Key = {key}; Status = expected NOTFOUND, actual {status}");

key = 9999;
(status, _) = await session.ReadAsync(9999);
if (status != Status.NOTFOUND)
throw new Exception("Error!");
throw new Exception($"Error! Key = {key}; Status = expected NOTFOUND, actual {status}");

await session.DeleteAsync(9998);
key = 9998;
await session.DeleteAsync(key);

status = await session.RMWAsync(9998, 10);
(status, _) = await session.ReadAsync(9998);
if (status != Status.NOTFOUND)
throw new Exception("Error!");
throw new Exception($"Error! Key = {key}; Status = expected NOTFOUND, actual {status}");

(status, output) = await session.ReadAsync(9998);
(status, output) = await session.RMWAsync(9998, 10);
if (status != Status.NOTFOUND || output != 10)
throw new Exception($"Error! Key = {key}; Status = expected NOTFOUND, actual {status}; output = expected {10}, actual {output}");

(status, output) = await session.ReadAsync(key);
if (status != Status.OK || output != 10)
throw new Exception("Error!");
throw new Exception($"Error! Key = {key}; Status = expected OK, actual {status}; output = expected {10}, actual {output}");

status = await session.RMWAsync(9998, 10);
if (status != Status.OK)
throw new Exception("Error!");
(status, output) = await session.RMWAsync(key, 10);
if (status != Status.OK || output != 20)
throw new Exception($"Error! Key = {key}; Status = expected OK, actual {status} output = expected {10}, actual {output}");

(status, output) = await session.ReadAsync(9998);
(status, output) = await session.ReadAsync(key);
if (status != Status.OK || output != 20)
throw new Exception("Error!");
throw new Exception($"Error! Key = {key}; Status = expected OK, actual {status}, output = expected {10}, actual {output}");
}
}
}
17 changes: 13 additions & 4 deletions cs/remote/samples/FixedLenServer/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public struct Functions : IFunctions<Key, Value, Input, Output, long>
public bool SupportsLocking => false;

// Callbacks
public void RMWCompletionCallback(ref Key key, ref Input input, long ctx, Status status) { }
public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { }

public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { }

Expand Down Expand Up @@ -90,17 +90,26 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst)

// RMW functions
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void InitialUpdater(ref Key key, ref Input input, ref Value value) => value.value = input.value;
public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output)
{
value.value = input.value;
output.value = value;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value)
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output)
{
Interlocked.Add(ref value.value, input.value);
output.value = value;
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue) => newValue.value = input.value + oldValue.value;
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output)
{
newValue.value = input.value + oldValue.value;
output.value = newValue;
}

public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { }

Expand Down
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.client/CallbackFunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public virtual void DeleteCompletionCallback(ref Key key, Context ctx) { }
/// <inheritdoc/>
public virtual void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { }
/// <inheritdoc/>
public virtual void RMWCompletionCallback(ref Key key, ref Input input, Context ctx, Status status) { }
public virtual void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { }
/// <inheritdoc/>
public virtual void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { }
}
Expand Down
67 changes: 58 additions & 9 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,22 @@ public Status Read(Key key, Input input, out Output output, Context userContext
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public Status RMW(ref Key key, ref Input input, Context userContext = default, long serialNo = 0)
=> InternalRMW(MessageType.RMW, ref key, ref input, userContext, serialNo);
{
Output output = default;
return InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo);
}

/// <summary>
/// RMW (read-modify-write) operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="input">Input</param>
/// <param name="output">Output</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public Status RMW(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
=> InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo);

/// <summary>
/// RMW (read-modify-write) operation
Expand All @@ -159,7 +174,25 @@ public Status RMW(ref Key key, ref Input input, Context userContext = default, l
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public Status RMW(Key key, Input input, Context userContext = default, long serialNo = 0)
=> InternalRMW(MessageType.RMW, ref key, ref input, userContext, serialNo);
{
Output output = default;
return InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo);
}

/// <summary>
/// RMW (read-modify-write) operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="input">Input</param>
/// <param name="output">Output</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public Status RMW(Key key, Input input, out Output output, Context userContext = default, long serialNo = 0)
{
output = default;
return InternalRMW(MessageType.RMW, ref key, ref input, ref output, userContext, serialNo);
}

/// <summary>
/// Delete operation
Expand Down Expand Up @@ -323,21 +356,28 @@ internal void ProcessReplies(byte[] buf, int offset)
{
var status = ReadStatus(ref src);
var result = readrmwQueue.Dequeue();
if (status == Status.PENDING)
if (status == Status.OK || status == Status.NOTFOUND)
{
result.Item3 = serializer.ReadOutput(ref src);
functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
}
else if (status == Status.PENDING)
{
var p = hrw.ReadPendingSeqNo(ref src);
readRmwPendingContext.Add(p, result);
}
else
functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, result.Item4, status);
functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);
break;
}
case MessageType.RMWAsync:
{
var status = ReadStatus(ref src);
var result = readrmwQueue.Dequeue();
var tcs = tcsQueue.Dequeue();
if (status == Status.PENDING)
if (status == Status.OK || status == Status.NOTFOUND)
tcs.SetResult((status, serializer.ReadOutput(ref src)));
else if (status == Status.PENDING)
{
var p = hrw.ReadPendingSeqNo(ref src);
readRmwPendingTcs.Add(p, tcs);
Expand Down Expand Up @@ -427,7 +467,13 @@ private void HandlePending(ref byte* src)
readRmwPendingContext.TryGetValue(p, out var result);
readRmwPendingContext.Remove(p);
#endif
functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, result.Item4, status);
if (status == Status.OK || status == Status.NOTFOUND)
{
result.Item3 = serializer.ReadOutput(ref src);
functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
}
else
functions.RMWCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);
break;
}
case MessageType.RMWAsync:
Expand All @@ -439,7 +485,10 @@ private void HandlePending(ref byte* src)
readRmwPendingTcs.TryGetValue(p, out var result);
readRmwPendingTcs.Remove(p);
#endif
result.SetResult((status, default));
if (status == Status.OK || status == Status.NOTFOUND)
result.SetResult((status, serializer.ReadOutput(ref src)));
else
result.SetResult((status, default));
break;
}
default:
Expand Down Expand Up @@ -528,7 +577,7 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Input input, Context userContext = default, long serialNo = 0)
private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
{
while (true)
{
Expand All @@ -540,7 +589,7 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, default, userContext));
readrmwQueue.Enqueue((key, input, output, userContext));
return Status.PENDING;
}
Flush();
Expand Down
Loading