Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Pub/sub functionality in remote FasterKV #514

Merged
merged 31 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e776793
Add client side support for SubscribeKV
badrishc Jun 3, 2021
ab779db
basic working impl
badrishc Jun 8, 2021
ee0de2b
Merge branch 'master' into remote-subkv
badrishc Jun 11, 2021
0981e62
Merge branch 'master' into remote-subkv
badrishc Jun 12, 2021
871ffed
Merge branch 'master' into remote-subkv
badrishc Jun 14, 2021
4b71e1b
Separate span byte serializer into client and server
badrishc Jun 15, 2021
7efa394
Added support in Subscriptions. TODO: prefix matching for subscriptio…
rohankadekodi-msr Jun 22, 2021
1d23ca1
Fixed small nit in MemoryparamSerializer to return Memory<byte> of co…
rohankadekodi-msr Jun 22, 2021
c2c8139
Fixed small nit in MemoryparamSerializer to return Memory<byte> of co…
rohankadekodi-msr Jun 23, 2021
c614639
Merge remote-tracking branch 'upstream/master' into remote-subkv
rohankadekodi-msr Jun 27, 2021
ecb1632
Resolving merge conflicts
rohankadekodi-msr Jun 27, 2021
5dea808
Removing the subscriptions belonging to a session on disconnection
rohankadekodi-msr Jun 28, 2021
0a6e79e
Added unit tests and VarLenClient test
rohankadekodi-msr Jul 2, 2021
0c83db1
nit fix
rohankadekodi-msr Jul 2, 2021
ada38d7
Merge branch 'microsoft:remote-subkv' into remote-subkv
rohankadekodi-msr Jul 2, 2021
b0c7cba
Merge branch 'master' into remote-subkv
rohankadekodi-msr Jul 2, 2021
d5c28d5
Made broker associated with FasterServer and not FasterKVProvider
rohankadekodi-msr Jul 6, 2021
c9e9b1b
fixed a bug for large values in pub-sub
rohankadekodi-msr Jul 6, 2021
aea0ea2
Changed the SubscribeKVBroker to be spawned by the user and passed to…
rohankadekodi-msr Jul 8, 2021
b69185c
Merge branch 'master' into remote-subkv
rohankadekodi-msr Jul 8, 2021
1010c89
fixed nit
rohankadekodi-msr Jul 8, 2021
afbcd35
Merge branch 'remote-subkv' of github.com:rohankadekodi-msr/FASTER in…
rohankadekodi-msr Jul 8, 2021
c315c4a
Merge branch 'master' into remote-subkv
rohankadekodi-msr Jul 12, 2021
212b787
Merged with recent master code
rohankadekodi-msr Jul 12, 2021
ac52566
Checked fixed len client
rohankadekodi-msr Jul 12, 2021
9e02a5a
Merge branch 'master' into remote-subkv
badrishc Jul 16, 2021
1f05cf0
Fixed bug in Write of key in server serializer
rohankadekodi-msr Jul 23, 2021
4ed33d7
Merge branch 'remote-subkv' of github.com:rohankadekodi-msr/FASTER in…
rohankadekodi-msr Jul 23, 2021
c2d6cd4
Merge branch 'master' into remote-subkv
rohankadekodi-msr Jul 27, 2021
c0982af
Added null check for broker
rohankadekodi-msr Jul 27, 2021
291f7b9
Merge branch 'master' into remote-subkv
badrishc Aug 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cs/remote/benchmark/FASTER.benchmark/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,9 @@ public void RMWCompletionCallback(ref Key key, ref Input input, ref Output outpu
public void UpsertCompletionCallback(ref Key key, ref Value value, Empty ctx)
{
}

public void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status)
{
}
}
}
4 changes: 4 additions & 0 deletions cs/remote/samples/FixedLenClient/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public override void ReadCompletionCallback(ref long key, ref long input, ref lo
}
}

public override void SubscribeKVCallback(ref long key, ref long input, ref long output, byte ctx, Status status)
{
}

public override void RMWCompletionCallback(ref long key, ref long input, ref long output, byte ctx, Status status)
{
if (ctx == 1)
Expand Down
32 changes: 31 additions & 1 deletion cs/remote/samples/FixedLenClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.client;

Expand Down Expand Up @@ -31,14 +32,18 @@ static void Main(string[] args)

// Create a client session to the FasterKV server.
// Sessions are mono-threaded, similar to normal FasterKV sessions.
using var session = client.NewSession(new Functions()); // Uses protocol WireFormat.DefaultFixedLenKV by default
using var session = client.NewSession(new Functions());
using var session2 = client.NewSession(new Functions());

// Explicit version of NewSession call, where you provide all types, callback functions, and serializer
// using var session = client.NewSession<long, long, byte, Functions, FixedLenSerializer<long, long, long, long>>(new Functions(), new FixedLenSerializer<long, long, long, long>());

// Samples using sync client API
SyncSamples(session);

// Samples using sync subscription client API
SyncSubscriptionSamples(session, session2);

// Samples using async client API
AsyncSamples(session).Wait();

Expand All @@ -47,6 +52,9 @@ static void Main(string[] args)

static void SyncSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session)
{
session.Upsert(23, 23 + 10000);
session.CompletePending(true);

for (int i = 0; i < 1000; i++)
session.Upsert(i, i + 10000);

Expand Down Expand Up @@ -94,6 +102,28 @@ static void SyncSamples(ClientSession<long, long, long, long, byte, Functions, F
session.CompletePending(true);
}

static void SyncSubscriptionSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session, ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session2)
{
session2.SubscribeKV(23);
session2.CompletePending(true);

for (int i = 0; i < 1000000; i++)
session.Upsert(23, i + 10);

// Flushes partially filled batches, does not wait for responses
session.Flush();
session.CompletePending(true);

session.RMW(23, 25);
session.CompletePending(true);

session.Flush();
session.CompletePending(true);

Thread.Sleep(1000);
}


static async Task AsyncSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session)
{
for (int i = 0; i < 1000; i++)
Expand Down
4 changes: 3 additions & 1 deletion cs/remote/samples/FixedLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ static void FixedLenServer(string[] args)
var store = new FasterKV<Key, Value>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

var broker = new SubscribeKVBroker<Key, Value, IKeySerializer<Key>>(new FixedLenKeySerializer<Key>());

// This fixed-length session provider can be used with compatible clients such as FixedLenClient and FASTER.benchmark
// Uses FixedLenSerializer as our in-built serializer for blittable (fixed length) types
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions());
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions(), broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
6 changes: 6 additions & 0 deletions cs/remote/samples/VarLenClient/CustomTypeFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ public override void ReadCompletionCallback(ref CustomType key, ref CustomType i
throw new Exception("Unexpected user context");
}
}

public override void SubscribeKVCallback(ref CustomType key, ref CustomType input, ref CustomType output, byte ctx, Status status)
{
}


}
}
34 changes: 30 additions & 4 deletions cs/remote/samples/VarLenClient/CustomTypeSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ public void Run(string ip, int port)

// Create a session to FasterKV server
// Sessions are mono-threaded, similar to normal FasterKV sessions
using var session = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);

// Explicit version of NewSession call, where you provide all types, callback functions, and serializer
// using var session = client.NewSession<long, long, long, Functions, BlittableParameterSerializer<long, long, long, long>>(new Functions(), new BlittableParameterSerializer<long, long, long, long>());
var session = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);
var subSession = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);

// Samples using sync client API
SyncVarLenSamples(session);

// Samples using sync client API
SyncVarLenSubscriptionSamples(session, subSession);

// Samples using async client API
AsyncVarLenSamples(session).Wait();
}
Expand All @@ -53,6 +54,31 @@ void SyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomT
session.CompletePending(true);
}

void SyncVarLenSubscriptionSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session,
ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session2)
{
session2.SubscribeKV(new CustomType(23));
session2.CompletePending(true);

session2.SubscribeKV(new CustomType(24));
session2.CompletePending(true);

session2.PSubscribeKV(new CustomType(25));
session2.CompletePending(true);

session.Upsert(new CustomType(23), new CustomType(2300));
session.CompletePending(true);

session.Upsert(new CustomType(24), new CustomType(2400));
session.CompletePending(true);

session.Upsert(new CustomType(25), new CustomType(2500));
session.CompletePending(true);

System.Threading.Thread.Sleep(1000);
}


async Task AsyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session)
{
// By default, we flush async operations as soon as they are issued
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/samples/VarLenClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static void Main(string[] args)
ip = args[0];
if (args.Length > 1 && args[1] != "-")
port = int.Parse(args[1]);

new MemoryBenchmark().Run(ip, port);
new MemorySamples().Run(ip, port);
new CustomTypeSamples().Run(ip, port);
Expand Down
4 changes: 3 additions & 1 deletion cs/remote/samples/VarLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ static void VarLenServer(string[] args)
var store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

var broker = new SubscribeKVBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer());

// This variable-length session provider can be used with compatible clients such as VarLenClient
var provider = new SpanByteFasterKVProvider(store);
var provider = new SpanByteFasterKVProvider(store, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
2 changes: 2 additions & 0 deletions cs/remote/src/FASTER.client/CallbackFunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ public virtual void ReadCompletionCallback(ref Key key, ref Input input, ref Out
public virtual void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { }
/// <inheritdoc/>
public virtual void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { }
/// <inheritdoc/>
public virtual void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { }
}
}
128 changes: 128 additions & 0 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public unsafe sealed partial class ClientSession<Key, Value, Input, Output, Cont
readonly int bufferSize;
readonly WireFormat wireFormat;
readonly MaxSizeSettings maxSizeSettings;
private bool subscriptionSession;

bool disposed;
ReusableObject<SeaaBuffer> sendObject;
Expand Down Expand Up @@ -64,6 +65,7 @@ public ClientSession(string address, int port, Functions functions, WireFormat w
this.bufferSize = BufferSizeUtils.ClientBufferSize(this.maxSizeSettings);
this.messageManager = new NetworkSender(bufferSize);
this.disposed = false;
this.subscriptionSession = false;

upsertQueue = new ElasticCircularBuffer<(Key, Value, Context)>();
readrmwQueue = new ElasticCircularBuffer<(Key, Input, Output, Context)>();
Expand Down Expand Up @@ -214,6 +216,30 @@ public Status Delete(ref Key key, Context userContext = default, long serialNo =
public Status Delete(Key key, Context userContext = default, long serialNo = 0)
=> InternalDelete(MessageType.Delete, ref key, userContext, serialNo);

/// <summary>
/// SubscribeKV operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public void SubscribeKV(Key key, Input input = default, Context userContext = default, long serialNo = 0)
=> InternalSubscribeKV(MessageType.SubscribeKV, ref key, ref input, userContext, serialNo);

/// <summary>
/// PSubscribeKV operation
/// </summary>
/// <param name="prefix">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public void PSubscribeKV(Key prefix, Input input = default, Context userContext = default, long serialNo = 0)
=> InternalSubscribeKV(MessageType.PSubscribeKV, ref prefix, ref input, userContext, serialNo);



/// <summary>
/// Flush current buffer of outgoing messages. Does not wait for responses.
/// </summary>
Expand Down Expand Up @@ -401,6 +427,60 @@ internal void ProcessReplies(byte[] buf, int offset)
tcs.SetResult((status, default));
break;
}
case MessageType.SubscribeKV:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item3 = serializer.ReadOutput(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
}
else if (status == Status.NOTFOUND)
{
readRmwPendingContext.TryGetValue(p, out var result);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
}
else if (status == Status.PENDING)
{
var result = readrmwQueue.Dequeue();
readRmwPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PSubscribeKV:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
result.Item3 = serializer.ReadOutput(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
}
else if (status == Status.NOTFOUND)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
}
else if (status == Status.PENDING)
{
var result = readrmwQueue.Dequeue();
readRmwPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PendingResult:
{
HandlePending(ref src);
Expand Down Expand Up @@ -491,6 +571,24 @@ private void HandlePending(ref byte* src)
result.SetResult((status, default));
break;
}
case MessageType.SubscribeKV:
{
var status = ReadStatus(ref src);
if (!readRmwPendingContext.TryGetValue(p, out var result))
{
Debug.WriteLine("Received unexpected subsription key");
break;
}

if (status == Status.OK)
{
result.Item3 = serializer.ReadOutput(ref src);
functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
}
else
functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);
break;
}
default:
{
throw new NotImplementedException();
Expand Down Expand Up @@ -539,6 +637,8 @@ private Socket GetSendSocket(string address, int port, int millisecondsTimeout =
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand All @@ -557,9 +657,33 @@ private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Inp
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalSubscribeKV(MessageType messageType, ref Key key, ref Input input, Context userContext = default, long serialNo = 0)
{
subscriptionSession = true;

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, default, userContext));
return Status.PENDING;
}
Flush();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand All @@ -581,6 +705,8 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand All @@ -602,6 +728,8 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalDelete(MessageType messageType, ref Key key, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand Down
Loading