Skip to content

Commit

Permalink
[C#] Support remote pub-sub and websockets (#543)
Browse files Browse the repository at this point in the history
* [C#] Pub/sub functionality in remote FasterKV (#514)

* Add client side support for SubscribeKV

* basic working impl

* Separate span byte serializer into client and server

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys (#499)

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys

* Added prefix matching, but similar to redis and unoptimized

* Added null checking for subscriptions on removeSubscriptions

* Made changes to correct concurrency and type checking

* Optimized and corrected the Start() method and Publish() method in publish call

* Added prefix subscriptions. Added a new client serializer call, need to discuss

* Corrected few nits

* Cleanup of code, and resolving NIE

* Nit fix of null checking

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case (#511)

* Resolving merge conflicts

* Removing the subscriptions belonging to a session on disconnection

* Added unit tests and VarLenClient test

* nit fix

* Made broker associated with FasterServer and not FasterKVProvider

* fixed a bug for large values in pub-sub

* Changed the SubscribeKVBroker to be spawned by the user and passed to provider

* fixed nit

* Merged with recent master code

* Checked fixed len client

* Fixed bug in Write of key in server serializer

* Added null check for broker

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>

* Improved byte array comparer, moved to FASTER.server

* refactor tests

* [C#] Remote pubsub (#531)

* Add client side support for SubscribeKV

* basic working impl

* Separate span byte serializer into client and server

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys (#499)

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys

* Added prefix matching, but similar to redis and unoptimized

* Added null checking for subscriptions on removeSubscriptions

* Made changes to correct concurrency and type checking

* Optimized and corrected the Start() method and Publish() method in publish call

* Added prefix subscriptions. Added a new client serializer call, need to discuss

* Corrected few nits

* Cleanup of code, and resolving NIE

* Nit fix of null checking

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case (#511)

* Resolving merge conflicts

* Removing the subscriptions belonging to a session on disconnection

* Added unit tests and VarLenClient test

* nit fix

* Made broker associated with FasterServer and not FasterKVProvider

* fixed a bug for large values in pub-sub

* Changed the SubscribeKVBroker to be spawned by the user and passed to provider

* fixed nit

* Merged with recent master code

* Checked fixed len client

* Fixed bug in Write of key in server serializer

* Working with non-kv pub-sub. Based off remote-subkv PR

* Fixed some nits, making tests work

* Changed the subscriptions and prefixSubscriptions dictionary to use sid as key

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>

* [C#] Remote subkv fixes (#542)

* Add client side support for SubscribeKV

* basic working impl

* Separate span byte serializer into client and server

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys (#499)

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys

* Added prefix matching, but similar to redis and unoptimized

* Added null checking for subscriptions on removeSubscriptions

* Made changes to correct concurrency and type checking

* Optimized and corrected the Start() method and Publish() method in publish call

* Added prefix subscriptions. Added a new client serializer call, need to discuss

* Corrected few nits

* Cleanup of code, and resolving NIE

* Nit fix of null checking

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case (#511)

* Resolving merge conflicts

* Removing the subscriptions belonging to a session on disconnection

* Added unit tests and VarLenClient test

* nit fix

* Made broker associated with FasterServer and not FasterKVProvider

* fixed a bug for large values in pub-sub

* Changed the SubscribeKVBroker to be spawned by the user and passed to provider

* fixed nit

* Merged with recent master code

* Checked fixed len client

* Fixed bug in Write of key in server serializer

* Added null check for broker

* Fixed a nit: subscriptions dictionary should contain key as sid and val as serversession

* fixed nit

* Removed old ByteArrayComparer

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>

* [C#] Remote websocket (#530)

* Add client side support for SubscribeKV

* basic working impl

* Separate span byte serializer into client and server

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys (#499)

* Added support in Subscriptions. TODO: prefix matching for subscriptions, publish deletes to keys

* Added prefix matching, but similar to redis and unoptimized

* Added null checking for subscriptions on removeSubscriptions

* Made changes to correct concurrency and type checking

* Optimized and corrected the Start() method and Publish() method in publish call

* Added prefix subscriptions. Added a new client serializer call, need to discuss

* Corrected few nits

* Cleanup of code, and resolving NIE

* Nit fix of null checking

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case

* Fixed small nit in MemoryparamSerializer to return Memory<byte> of correct length, and in publish call corner case (#511)

* Resolving merge conflicts

* Removing the subscriptions belonging to a session on disconnection

* Added unit tests and VarLenClient test

* nit fix

* Made broker associated with FasterServer and not FasterKVProvider

* fixed a bug for large values in pub-sub

* Changed the SubscribeKVBroker to be spawned by the user and passed to provider

* fixed nit

* Merged with recent master code

* Checked fixed len client

* Working on JS client

* Added batching in websocket handling. Next step: finish HandlePending()

* Modified websocket server. Fixed bug in Write key of server serializer

* duplicate file from faster.common

* Added sample for WebClient

* fixed small nit

* Fixed some nits in js clientsession

* fixed nit

* removed old ByteArrayComparer

* Fixed nits, added non-kv PubSub to WebsocketServerSession

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>

* Cleanup and updates.

* fix break.

* [C#] Modifications to pubsub (#547)

* Added FASTER Log for SubscribeKV and Subscribe

* Added input in subscribeKV

* Added SubscribeCallback and SubscribeKV sample in html

* set enablePubSub: false for non-pubsub tests

Co-authored-by: rohankadekodi-msr <69916400+rohankadekodi-msr@users.noreply.github.com>
  • Loading branch information
badrishc and rohankadekodi-msr authored Aug 28, 2021
1 parent 0ae0568 commit 6e13ce3
Show file tree
Hide file tree
Showing 59 changed files with 3,267 additions and 33 deletions.
7 changes: 7 additions & 0 deletions cs/remote/FASTER.remote.sln
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{48000B9F-1
..\..\docs\_data\navigation.yml = ..\..\docs\_data\navigation.yml
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "WebClient", "WebClient", "{8D1F793C-DA1E-4C75-B824-E510BB54534E}"
ProjectSection(SolutionItems) = preProject
samples\WebClient\FASTERFunctions.js = samples\WebClient\FASTERFunctions.js
samples\WebClient\WebClient.html = samples\WebClient\WebClient.html
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -162,6 +168,7 @@ Global
{4053EC35-77A5-4728-B16F-F4FDD1104CAF} = {1065AE7E-DEA5-4E21-AE39-95B93C074B17}
{6A49ADD2-DC25-47E1-9D29-5DC6380E880A} = {6B8D1038-C9D5-4111-B5CE-BF64E7D12AE1}
{2238A430-8D61-40A3-A23B-B1163A4CCBC6} = {8CF11B91-A6B6-4B81-AD43-2B07CF60F8FF}
{8D1F793C-DA1E-4C75-B824-E510BB54534E} = {1065AE7E-DEA5-4E21-AE39-95B93C074B17}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FB603D60-F72D-4DAD-9349-442A45E20276}
Expand Down
11 changes: 11 additions & 0 deletions cs/remote/benchmark/FASTER.benchmark/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,16 @@ public void RMWCompletionCallback(ref Key key, ref Input input, ref Output outpu
public void UpsertCompletionCallback(ref Key key, ref Value value, Empty ctx)
{
}

public void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status)
{
}

public void PublishCompletionCallback(ref Key key, ref Value value, Empty ctx)
{
}
public void SubscribeCallback(ref Key key, ref Value value, Empty ctx)
{
}
}
}
4 changes: 4 additions & 0 deletions cs/remote/samples/FixedLenClient/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public override void ReadCompletionCallback(ref long key, ref long input, ref lo
}
}

public override void SubscribeKVCallback(ref long key, ref long input, ref long output, byte ctx, Status status)
{
}

public override void RMWCompletionCallback(ref long key, ref long input, ref long output, byte ctx, Status status)
{
if (ctx == 1)
Expand Down
32 changes: 31 additions & 1 deletion cs/remote/samples/FixedLenClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.client;

Expand Down Expand Up @@ -31,14 +32,18 @@ static void Main(string[] args)

// Create a client session to the FasterKV server.
// Sessions are mono-threaded, similar to normal FasterKV sessions.
using var session = client.NewSession(new Functions()); // Uses protocol WireFormat.DefaultFixedLenKV by default
using var session = client.NewSession(new Functions());
using var session2 = client.NewSession(new Functions());

// Explicit version of NewSession call, where you provide all types, callback functions, and serializer
// using var session = client.NewSession<long, long, byte, Functions, FixedLenSerializer<long, long, long, long>>(new Functions(), new FixedLenSerializer<long, long, long, long>());

// Samples using sync client API
SyncSamples(session);

// Samples using sync subscription client API
SyncSubscriptionSamples(session, session2);

// Samples using async client API
AsyncSamples(session).Wait();

Expand All @@ -47,6 +52,9 @@ static void Main(string[] args)

static void SyncSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session)
{
session.Upsert(23, 23 + 10000);
session.CompletePending(true);

for (int i = 0; i < 1000; i++)
session.Upsert(i, i + 10000);

Expand Down Expand Up @@ -94,6 +102,28 @@ static void SyncSamples(ClientSession<long, long, long, long, byte, Functions, F
session.CompletePending(true);
}

static void SyncSubscriptionSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session, ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session2)
{
session2.SubscribeKV(23);
session2.CompletePending(true);

for (int i = 0; i < 1000000; i++)
session.Upsert(23, i + 10);

// Flushes partially filled batches, does not wait for responses
session.Flush();
session.CompletePending(true);

session.RMW(23, 25);
session.CompletePending(true);

session.Flush();
session.CompletePending(true);

Thread.Sleep(1000);
}


static async Task AsyncSamples(ClientSession<long, long, long, long, byte, Functions, FixedLenSerializer<long, long, long, long>> session)
{
for (int i = 0; i < 1000; i++)
Expand Down
13 changes: 12 additions & 1 deletion cs/remote/samples/FixedLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,20 @@ static void FixedLenServer(string[] args)
var store = new FasterKV<Key, Value>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

SubscribeKVBroker<Key, Value, Input, IKeyInputSerializer<Key, Input>> kvBroker = null;
SubscribeBroker<Key, Value, IKeySerializer<Key>> broker = null;

if (opts.EnablePubSub)
{
// Create a broker for pub-sub of key-value pairs in remote FASTER instance
kvBroker = new SubscribeKVBroker<Key, Value, Input, IKeyInputSerializer<Key, Input>>(new FixedLenKeyInputSerializer<Key, Input>(), opts.LogDir, true);
// Create a broker for pub-sub of key-value pairs
broker = new SubscribeBroker<Key, Value, IKeySerializer<Key>>(new FixedLenKeySerializer<Key>(), null, true);
}

// This fixed-length session provider can be used with compatible clients such as FixedLenClient and FASTER.benchmark
// Uses FixedLenSerializer as our in-built serializer for blittable (fixed length) types
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions());
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions(), kvBroker, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
8 changes: 8 additions & 0 deletions cs/remote/samples/VarLenClient/CustomTypeFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,13 @@ public override void ReadCompletionCallback(ref CustomType key, ref CustomType i
throw new Exception("Unexpected user context");
}
}

public override void SubscribeKVCallback(ref CustomType key, ref CustomType input, ref CustomType output, byte ctx, Status status)
{
}

public override void SubscribeCallback(ref CustomType key, ref CustomType value, byte ctx)
{
}
}
}
61 changes: 57 additions & 4 deletions cs/remote/samples/VarLenClient/CustomTypeSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Threading;
using System.Threading.Tasks;
using FASTER.client;
using FASTER.common;
Expand All @@ -22,14 +23,18 @@ public void Run(string ip, int port)

// Create a session to FasterKV server
// Sessions are mono-threaded, similar to normal FasterKV sessions
using var session = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);

// Explicit version of NewSession call, where you provide all types, callback functions, and serializer
// using var session = client.NewSession<long, long, long, Functions, BlittableParameterSerializer<long, long, long, long>>(new Functions(), new BlittableParameterSerializer<long, long, long, long>());
var session = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);
var subSession = client.NewSession(new CustomTypeFunctions(), WireFormat.DefaultVarLenKV);

// Samples using sync client API
SyncVarLenSamples(session);

// Samples using sync client API
SyncVarLenSubscriptionKVSamples(session, subSession);

// Samples using sync client API
SyncVarLenSubscriptionSamples(session, subSession);

// Samples using async client API
AsyncVarLenSamples(session).Wait();
}
Expand All @@ -53,6 +58,54 @@ void SyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomT
session.CompletePending(true);
}

void SyncVarLenSubscriptionKVSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session,
ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session2)
{
session2.SubscribeKV(new CustomType(23));
session2.CompletePending(true);

session2.SubscribeKV(new CustomType(24));
session2.CompletePending(true);

session2.PSubscribeKV(new CustomType(25));
session2.CompletePending(true);

session.Upsert(new CustomType(23), new CustomType(2300));
session.CompletePending(true);

session.Upsert(new CustomType(24), new CustomType(2400));
session.CompletePending(true);

session.Upsert(new CustomType(25), new CustomType(2500));
session.CompletePending(true);

System.Threading.Thread.Sleep(1000);
}

void SyncVarLenSubscriptionSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session,
ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session2)
{
session2.Subscribe(new CustomType(23));
session2.CompletePending(true);

session2.Subscribe(new CustomType(24));
session2.CompletePending(true);

session2.PSubscribe(new CustomType(25));
session2.CompletePending(true);

session.Publish(new CustomType(23), new CustomType(2300));
session.CompletePending(true);

session.Publish(new CustomType(24), new CustomType(2400));
session.CompletePending(true);

session.Publish(new CustomType(25), new CustomType(2500));
session.CompletePending(true);

System.Threading.Thread.Sleep(1000);
}

async Task AsyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session)
{
// By default, we flush async operations as soon as they are issued
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/samples/VarLenClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static void Main(string[] args)
ip = args[0];
if (args.Length > 1 && args[1] != "-")
port = int.Parse(args[1]);

new MemoryBenchmark().Run(ip, port);
new MemorySamples().Run(ip, port);
new CustomTypeSamples().Run(ip, port);
Expand Down
3 changes: 3 additions & 0 deletions cs/remote/samples/VarLenServer/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class Options
[Option('r', "recover", Required = false, Default = false, HelpText = "Recover from latest checkpoint.")]
public bool Recover { get; set; }

[Option("pubsub", Required = false, Default = true, HelpText = "Enable pub/sub feature on server.")]
public bool EnablePubSub { get; set; }

public int MemorySizeBits()
{
long size = ParseSize(MemorySize);
Expand Down
15 changes: 14 additions & 1 deletion cs/remote/samples/VarLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,28 @@ static void VarLenServer(string[] args)
var store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker = null;
SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null;

if (opts.EnablePubSub)
{
// Create a broker for pub-sub of key value-pairs in FASTER instance
kvBroker = new SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>>(new SpanByteKeyInputSerializer(), opts.LogDir, true);
// Create a broker for topic-based pub-sub of key-value pairs
broker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, true);
}

// This variable-length session provider can be used with compatible clients such as VarLenClient
var provider = new SpanByteFasterKVProvider(store);
var provider = new SpanByteFasterKVProvider(store, kvBroker, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);

// Register provider as backend provider for WireFormat.DefaultFixedLenKV
// You can register multiple providers with the same server, with different wire protocol specifications
server.Register(WireFormat.DefaultVarLenKV, provider);
// Register provider as backend provider for WireFormat.WebSocket
server.Register(WireFormat.WebSocket, provider);

// Start server
server.Start();
Expand Down
47 changes: 47 additions & 0 deletions cs/remote/samples/WebClient/FASTERFunctions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// JavaScript source code

function writeToScreen(message) {
output.insertAdjacentHTML("afterbegin", "<p>" + message + "</p>");
}

class FASTERFunctions extends CallbackFunctionsBase {

constructor(client) {
super();
}

ReadCompletionCallback(keyBytes, outputBytes, status) {
if (status == Status.OK) {
var output = deserialize(outputBytes, 0, outputBytes.length);
writeToScreen("<span> value: " + output + " </span>");
}
}

UpsertCompletionCallback(keyBytes, valueBytes, status) {
if (status == Status.OK) {
writeToScreen("<span> PUT OK </span>");
}
}

DeleteCompletionCallback(keyBytes, status) { }

RMWCompletionCallback(keyBytes, outputBytes, status) { }

SubscribeKVCompletionCallback(keyBytes, outputBytes, status)
{
if (status == Status.OK) {
var key = deserialize(keyBytes, 0, keyBytes.length);
var output = deserialize(outputBytes, 0, outputBytes.length);
writeToScreen("<span> subscribed key: " + key + " value: " + output + " </span>");
}
}

SubscribeCompletionCallback(keyBytes, valueBytes, status)
{
if (status == Status.OK) {
var key = deserialize(keyBytes, 0, keyBytes.length);
var value = deserialize(valueBytes, 0, valueBytes.length);
writeToScreen("<span> subscribed key: " + key + " value: " + value + " </span>");
}
}
}
Loading

0 comments on commit 6e13ce3

Please sign in to comment.