Skip to content

Commit

Permalink
[C#] Pub/sub functionality in remote FasterKV (#514)
Browse files Browse the repository at this point in the history
* Add client side support for SubscribeKV

* basic working impl

* Separate span byte serializer into client and server

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys (#499)

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys

* Added prefix matching, but similar to redis and unoptimized

* Added null checking for subscriptions on removeSubscriptions

* Made changes to correct concurrency and type checking

* Optimized and corrected the Start() method and Publish() method in publish call

* Added prefix subscriptions. Added a new client serializer call, need to discuss

* Corrected few nits

* Cleanup of code, and resolving NIE

* Nit fix of null checking

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case (#511)

* Resolving merge conflicts

* Removing the subscriptions belonging to a session on disconnection

* Added unit tests and VarLenClient test

* nit fix

* Made broker associated with FasterServer and not FasterKVProvider

* fixed a bug for large values in pub-sub

* Changed the SubscribeKVBroker to be spawned by the user and passed to provider

* fixed nit

* Merged with recent master code

* Checked fixed len client

* Fixed bug in Write of key in server serializer

* Added null check for broker

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
rohankadekodi-msr and badrishc authored Aug 13, 2021
1 parent 3a77bc0 commit 8267ce1
Show file tree
Hide file tree
Showing 39 changed files with 958 additions and 26 deletions.
4 changes: 4 additions & 0 deletions cs/remote/benchmark/FASTER.benchmark/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,9 @@ public void RMWCompletionCallback(ref Key key, ref Input input, ref Output outpu
public void UpsertCompletionCallback(ref Key key, ref Value value, Empty ctx)
{
}

public void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status)
{
}
}
}
4 changes: 4 additions & 0 deletions cs/remote/samples/FixedLenClient/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public override void ReadCompletionCallback(ref long key, ref long input, ref lo
}
}

public override void SubscribeKVCallback(ref long key, ref long input, ref long output, byte ctx, Status status)
{
}

public override void RMWCompletionCallback(ref long key, ref long input, ref long output, byte ctx, Status status)
{
if (ctx == 1)
Expand Down
32 changes: 31 additions & 1 deletion cs/remote/samples/FixedLenClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.client;

Expand Down Expand Up @@ -31,14 +32,18 @@ static void Main(string[] args)

// Create a client session to the FasterKV server.
// Sessions are mono-threaded, similar to normal FasterKV sessions.
using var session = client.NewSession(new Functions()); // Uses protocol WireFormat.DefaultFixedLenKV by default
using var session = client.NewSession(new Functions());
using var session2 = client.NewSession(new Functions());

// Explicit version of NewSession call, where you provide all types, callback functions, and serializer
// using var session = client.NewSession<long, long, byte, Functions, FixedLenSerializer<long, long, long, long>>(new Functions(), new FixedLenSerializer<long, long, long, long>());

// Samples using sync client API
SyncSamples(session);

// Samples using sync subscription client API
SyncSubscriptionSamples(session, session2);

// Samples using async client API
AsyncSamples(session).Wait();

Expand All @@ -47,6 +52,9 @@ static void Main(string[] args)

static void SyncSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session)
{
session.Upsert(23, 23 + 10000);
session.CompletePending(true);

for (int i = 0; i < 1000; i++)
session.Upsert(i, i + 10000);

Expand Down Expand Up @@ -94,6 +102,28 @@ static void SyncSamples(ClientSession<long, long, long, long, byte, Functions, F
session.CompletePending(true);
}

static void SyncSubscriptionSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session, ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session2)
{
session2.SubscribeKV(23);
session2.CompletePending(true);

for (int i = 0; i < 1000000; i++)
session.Upsert(23, i + 10);

// Flushes partially filled batches, does not wait for responses
session.Flush();
session.CompletePending(true);

session.RMW(23, 25);
session.CompletePending(true);

session.Flush();
session.CompletePending(true);

Thread.Sleep(1000);
}


static async Task AsyncSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session)
{
for (int i = 0; i < 1000; i++)
Expand Down
4 changes: 3 additions & 1 deletion cs/remote/samples/FixedLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ static void FixedLenServer(string[] args)
var store = new FasterKV<Key, Value>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

var broker = new SubscribeKVBroker<Key, Value, IKeySerializer<Key>>(new FixedLenKeySerializer<Key>());

// This fixed-length session provider can be used with compatible clients such as FixedLenClient and FASTER.benchmark
// Uses FixedLenSerializer as our in-built serializer for blittable (fixed length) types
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions());
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions(), broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
6 changes: 6 additions & 0 deletions cs/remote/samples/VarLenClient/CustomTypeFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ public override void ReadCompletionCallback(ref CustomType key, ref CustomType i
throw new Exception("Unexpected user context");
}
}

public override void SubscribeKVCallback(ref CustomType key, ref CustomType input, ref CustomType output, byte ctx, Status status)
{
}


}
}
34 changes: 30 additions & 4 deletions cs/remote/samples/VarLenClient/CustomTypeSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ public void Run(string ip, int port)

// Create a session to FasterKV server
// Sessions are mono-threaded, similar to normal FasterKV sessions
using var session = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);

// Explicit version of NewSession call, where you provide all types, callback functions, and serializer
// using var session = client.NewSession<long, long, long, Functions, BlittableParameterSerializer<long, long, long, long>>(new Functions(), new BlittableParameterSerializer<long, long, long, long>());
var session = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);
var subSession = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);

// Samples using sync client API
SyncVarLenSamples(session);

// Samples using sync client API
SyncVarLenSubscriptionSamples(session, subSession);

// Samples using async client API
AsyncVarLenSamples(session).Wait();
}
Expand All @@ -53,6 +54,31 @@ void SyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomT
session.CompletePending(true);
}

void SyncVarLenSubscriptionSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session,
ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session2)
{
session2.SubscribeKV(new CustomType(23));
session2.CompletePending(true);

session2.SubscribeKV(new CustomType(24));
session2.CompletePending(true);

session2.PSubscribeKV(new CustomType(25));
session2.CompletePending(true);

session.Upsert(new CustomType(23), new CustomType(2300));
session.CompletePending(true);

session.Upsert(new CustomType(24), new CustomType(2400));
session.CompletePending(true);

session.Upsert(new CustomType(25), new CustomType(2500));
session.CompletePending(true);

System.Threading.Thread.Sleep(1000);
}


async Task AsyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session)
{
// By default, we flush async operations as soon as they are issued
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/samples/VarLenClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static void Main(string[] args)
ip = args[0];
if (args.Length > 1 && args[1] != "-")
port = int.Parse(args[1]);

new MemoryBenchmark().Run(ip, port);
new MemorySamples().Run(ip, port);
new CustomTypeSamples().Run(ip, port);
Expand Down
4 changes: 3 additions & 1 deletion cs/remote/samples/VarLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ static void VarLenServer(string[] args)
var store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

var broker = new SubscribeKVBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer());

// This variable-length session provider can be used with compatible clients such as VarLenClient
var provider = new SpanByteFasterKVProvider(store);
var provider = new SpanByteFasterKVProvider(store, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
2 changes: 2 additions & 0 deletions cs/remote/src/FASTER.client/CallbackFunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ public virtual void ReadCompletionCallback(ref Key key, ref Input input, ref Out
public virtual void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { }
/// <inheritdoc/>
public virtual void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { }
/// <inheritdoc/>
public virtual void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { }
}
}
128 changes: 128 additions & 0 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public unsafe sealed partial class ClientSession<Key, Value, Input, Output, Cont
readonly int bufferSize;
readonly WireFormat wireFormat;
readonly MaxSizeSettings maxSizeSettings;
private bool subscriptionSession;

bool disposed;
ReusableObject<SeaaBuffer> sendObject;
Expand Down Expand Up @@ -64,6 +65,7 @@ public ClientSession(string address, int port, Functions functions, WireFormat w
this.bufferSize = BufferSizeUtils.ClientBufferSize(this.maxSizeSettings);
this.messageManager = new NetworkSender(bufferSize);
this.disposed = false;
this.subscriptionSession = false;

upsertQueue = new ElasticCircularBuffer<(Key, Value, Context)>();
readrmwQueue = new ElasticCircularBuffer<(Key, Input, Output, Context)>();
Expand Down Expand Up @@ -214,6 +216,30 @@ public Status Delete(ref Key key, Context userContext = default, long serialNo =
public Status Delete(Key key, Context userContext = default, long serialNo = 0)
=> InternalDelete(MessageType.Delete, ref key, userContext, serialNo);

/// <summary>
/// SubscribeKV operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public void SubscribeKV(Key key, Input input = default, Context userContext = default, long serialNo = 0)
=> InternalSubscribeKV(MessageType.SubscribeKV, ref key, ref input, userContext, serialNo);

/// <summary>
/// PSubscribeKV operation
/// </summary>
/// <param name="prefix">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public void PSubscribeKV(Key prefix, Input input = default, Context userContext = default, long serialNo = 0)
=> InternalSubscribeKV(MessageType.PSubscribeKV, ref prefix, ref input, userContext, serialNo);



/// <summary>
/// Flush current buffer of outgoing messages. Does not wait for responses.
/// </summary>
Expand Down Expand Up @@ -401,6 +427,60 @@ internal void ProcessReplies(byte[] buf, int offset)
tcs.SetResult((status, default));
break;
}
case MessageType.SubscribeKV:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item3 = serializer.ReadOutput(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
}
else if (status == Status.NOTFOUND)
{
readRmwPendingContext.TryGetValue(p, out var result);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
}
else if (status == Status.PENDING)
{
var result = readrmwQueue.Dequeue();
readRmwPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PSubscribeKV:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
result.Item3 = serializer.ReadOutput(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, Status.OK);
}
else if (status == Status.NOTFOUND)
{
readRmwPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
functions.SubscribeKVCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, Status.NOTFOUND);
}
else if (status == Status.PENDING)
{
var result = readrmwQueue.Dequeue();
readRmwPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PendingResult:
{
HandlePending(ref src);
Expand Down Expand Up @@ -491,6 +571,24 @@ private void HandlePending(ref byte* src)
result.SetResult((status, default));
break;
}
case MessageType.SubscribeKV:
{
var status = ReadStatus(ref src);
if (!readRmwPendingContext.TryGetValue(p, out var result))
{
Debug.WriteLine("Received unexpected subsription key");
break;
}

if (status == Status.OK)
{
result.Item3 = serializer.ReadOutput(ref src);
functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref result.Item3, result.Item4, status);
}
else
functions.ReadCompletionCallback(ref result.Item1, ref result.Item2, ref defaultOutput, result.Item4, status);
break;
}
default:
{
throw new NotImplementedException();
Expand Down Expand Up @@ -539,6 +637,8 @@ private Socket GetSendSocket(string address, int port, int millisecondsTimeout =
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand All @@ -557,9 +657,33 @@ private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Inp
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalSubscribeKV(MessageType messageType, ref Key key, ref Input input, Context userContext = default, long serialNo = 0)
{
subscriptionSession = true;

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, default, userContext));
return Status.PENDING;
}
Flush();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand All @@ -581,6 +705,8 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand All @@ -602,6 +728,8 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalDelete(MessageType messageType, ref Key key, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
Expand Down
Loading

0 comments on commit 8267ce1

Please sign in to comment.