Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Remote pubsub #531

Merged
merged 34 commits into from
Aug 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e776793
Add client side support for SubscribeKV
badrishc Jun 3, 2021
ab779db
basic working impl
badrishc Jun 8, 2021
ee0de2b
Merge branch 'master' into remote-subkv
badrishc Jun 11, 2021
0981e62
Merge branch 'master' into remote-subkv
badrishc Jun 12, 2021
871ffed
Merge branch 'master' into remote-subkv
badrishc Jun 14, 2021
4b71e1b
Separate span byte serializer into client and server
badrishc Jun 15, 2021
7efa394
Added support in Subscriptions. TODO: prefix matching for subscriptio…
rohankadekodi-msr Jun 22, 2021
1d23ca1
Fixed small nit in MemoryparamSerializer to return Memory<byte> of co…
rohankadekodi-msr Jun 22, 2021
c2c8139
Fixed small nit in MemoryparamSerializer to return Memory<byte> of co…
rohankadekodi-msr Jun 23, 2021
c614639
Merge remote-tracking branch 'upstream/master' into remote-subkv
rohankadekodi-msr Jun 27, 2021
ecb1632
Resolving merge conflicts
rohankadekodi-msr Jun 27, 2021
5dea808
Removing the subscriptions belonging to a session on disconnection
rohankadekodi-msr Jun 28, 2021
0a6e79e
Added unit tests and VarLenClient test
rohankadekodi-msr Jul 2, 2021
0c83db1
nit fix
rohankadekodi-msr Jul 2, 2021
ada38d7
Merge branch 'microsoft:remote-subkv' into remote-subkv
rohankadekodi-msr Jul 2, 2021
b0c7cba
Merge branch 'master' into remote-subkv
rohankadekodi-msr Jul 2, 2021
d5c28d5
Made broker associated with FasterServer and not FasterKVProvider
rohankadekodi-msr Jul 6, 2021
c9e9b1b
fixed a bug for large values in pub-sub
rohankadekodi-msr Jul 6, 2021
aea0ea2
Changed the SubscribeKVBroker to be spawned by the user and passed to…
rohankadekodi-msr Jul 8, 2021
b69185c
Merge branch 'master' into remote-subkv
rohankadekodi-msr Jul 8, 2021
1010c89
fixed nit
rohankadekodi-msr Jul 8, 2021
afbcd35
Merge branch 'remote-subkv' of github.com:rohankadekodi-msr/FASTER in…
rohankadekodi-msr Jul 8, 2021
c315c4a
Merge branch 'master' into remote-subkv
rohankadekodi-msr Jul 12, 2021
212b787
Merged with recent master code
rohankadekodi-msr Jul 12, 2021
ac52566
Checked fixed len client
rohankadekodi-msr Jul 12, 2021
9e02a5a
Merge branch 'master' into remote-subkv
badrishc Jul 16, 2021
1f05cf0
Fixed bug in Write of key in server serializer
rohankadekodi-msr Jul 23, 2021
4ed33d7
Merge branch 'remote-subkv' of github.com:rohankadekodi-msr/FASTER in…
rohankadekodi-msr Jul 23, 2021
fc0129d
Working with non-kv pub-sub. Based off remote-subkv PR
rohankadekodi-msr Jul 25, 2021
a165a30
Merge branch 'master' into remote-pubsub
rohankadekodi-msr Jul 25, 2021
1474246
Merge branch 'remote-pubsub' into remote-pubsub
badrishc Aug 13, 2021
38df74a
Merge branch 'remote-pubsub' into remote-pubsub
rohankadekodi-msr Aug 15, 2021
193b6fc
Fixed some nits, making tests work
rohankadekodi-msr Aug 15, 2021
967ca82
Changed the subscriptions and prefixSubscriptions dictionary to use s…
rohankadekodi-msr Aug 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cs/remote/benchmark/FASTER.benchmark/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,12 @@ public void UpsertCompletionCallback(ref Key key, ref Value value, Empty ctx)
public void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Empty ctx, Status status)
{
}

public void PublishCompletionCallback(ref Key key, ref Value value, Empty ctx)
{
}
public void SubscribeCallback(ref Key key, ref Value value, Empty ctx)
{
}
}
}
7 changes: 5 additions & 2 deletions cs/remote/samples/FixedLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ static void FixedLenServer(string[] args)
var store = new FasterKV<Key, Value>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

var broker = new SubscribeKVBroker<Key, Value, IKeySerializer<Key>>(new FixedLenKeySerializer<Key>());
// Create a broker for pub-sub of key-value pairs in remote FASTER instance
var kvBroker = new SubscribeKVBroker<Key, Value, IKeySerializer<Key>>(new FixedLenKeySerializer<Key>());
// Create a broker for pub-sub of key-value pairs
var broker = new SubscribeBroker<Key, Value, IKeySerializer<Key>>(new FixedLenKeySerializer<Key>());

// This fixed-length session provider can be used with compatible clients such as FixedLenClient and FASTER.benchmark
// Uses FixedLenSerializer as our in-built serializer for blittable (fixed length) types
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions(), broker);
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions(), kvBroker, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
4 changes: 3 additions & 1 deletion cs/remote/samples/VarLenClient/CustomTypeFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public override void SubscribeKVCallback(ref CustomType key, ref CustomType inpu
{
}


public override void SubscribeCallback(ref CustomType key, ref CustomType value, byte ctx)
{
}
}
}
29 changes: 28 additions & 1 deletion cs/remote/samples/VarLenClient/CustomTypeSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Threading;
using System.Threading.Tasks;
using FASTER.client;
using FASTER.common;
Expand All @@ -28,6 +29,9 @@ public void Run(string ip, int port)
// Samples using sync client API
SyncVarLenSamples(session);

// Samples using sync client API
SyncVarLenSubscriptionKVSamples(session, subSession);

// Samples using sync client API
SyncVarLenSubscriptionSamples(session, subSession);

Expand All @@ -54,7 +58,7 @@ void SyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomT
session.CompletePending(true);
}

void SyncVarLenSubscriptionSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session,
void SyncVarLenSubscriptionKVSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session,
ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session2)
{
session2.SubscribeKV(new CustomType(23));
Expand All @@ -78,6 +82,29 @@ void SyncVarLenSubscriptionSamples(ClientSession<CustomType, CustomType, CustomT
System.Threading.Thread.Sleep(1000);
}

void SyncVarLenSubscriptionSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session,
ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session2)
{
session2.Subscribe(new CustomType(23));
session2.CompletePending(true);

session2.Subscribe(new CustomType(24));
session2.CompletePending(true);

session2.PSubscribe(new CustomType(25));
session2.CompletePending(true);

session.Publish(new CustomType(23), new CustomType(2300));
session.CompletePending(true);

session.Publish(new CustomType(24), new CustomType(2400));
session.CompletePending(true);

session.Publish(new CustomType(25), new CustomType(2500));
session.CompletePending(true);

System.Threading.Thread.Sleep(1000);
}

async Task AsyncVarLenSamples(ClientSession<CustomType, CustomType, CustomType, CustomType, byte, CustomTypeFunctions, FixedLenSerializer<CustomType, CustomType, CustomType, CustomType>> session)
{
Expand Down
7 changes: 5 additions & 2 deletions cs/remote/samples/VarLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ static void VarLenServer(string[] args)
var store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

var broker = new SubscribeKVBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer());
// Create a broker for pub-sub of key value-pairs in remote FASTER instance
var kvBroker = new SubscribeKVBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer());
// Create a broker for pub-sub of key-value pairs
var broker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer());

// This variable-length session provider can be used with compatible clients such as VarLenClient
var provider = new SpanByteFasterKVProvider(store, broker);
var provider = new SpanByteFasterKVProvider(store, kvBroker, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
4 changes: 4 additions & 0 deletions cs/remote/src/FASTER.client/CallbackFunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ public virtual void RMWCompletionCallback(ref Key key, ref Input input, ref Outp
public virtual void UpsertCompletionCallback(ref Key key, ref Value value, Context ctx) { }
/// <inheritdoc/>
public virtual void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status) { }
/// <inheritdoc/>
public virtual void PublishCompletionCallback(ref Key key, ref Value value, Context ctx) { }
/// <inheritdoc/>
public virtual void SubscribeCallback(ref Key key, ref Value value, Context ctx) { }
}
}
136 changes: 133 additions & 3 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public unsafe sealed partial class ClientSession<Key, Value, Input, Output, Cont

readonly ElasticCircularBuffer<(Key, Value, Context)> upsertQueue;
readonly ElasticCircularBuffer<(Key, Input, Output, Context)> readrmwQueue;
readonly ElasticCircularBuffer<(Key, Value, Context)> pubsubQueue;
readonly ElasticCircularBuffer<TaskCompletionSource<(Status, Output)>> tcsQueue;

/// <summary>
Expand All @@ -69,6 +70,7 @@ public ClientSession(string address, int port, Functions functions, WireFormat w

upsertQueue = new ElasticCircularBuffer<(Key, Value, Context)>();
readrmwQueue = new ElasticCircularBuffer<(Key, Input, Output, Context)>();
pubsubQueue = new ElasticCircularBuffer<(Key, Value, Context)>();
tcsQueue = new ElasticCircularBuffer<TaskCompletionSource<(Status, Output)>>();

numPendingBatches = 0;
Expand Down Expand Up @@ -238,7 +240,38 @@ public void SubscribeKV(Key key, Input input = default, Context userContext = de
public void PSubscribeKV(Key prefix, Input input = default, Context userContext = default, long serialNo = 0)
=> InternalSubscribeKV(MessageType.PSubscribeKV, ref prefix, ref input, userContext, serialNo);

/// <summary>
/// Upsert operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="desiredValue">Desired value</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public Status Publish(Key key, Value desiredValue, Context userContext = default, long serialNo = 0)
=> InternalPublish(MessageType.Publish, ref key, ref desiredValue, userContext, serialNo);

/// <summary>
/// SubscribeKV operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public void Subscribe(Key key, Context userContext = default, long serialNo = 0)
=> InternalSubscribe(MessageType.Subscribe, ref key, userContext, serialNo);

/// <summary>
/// PSubscribe operation
/// </summary>
/// <param name="prefix">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
public void PSubscribe(Key prefix, Context userContext = default, long serialNo = 0)
=> InternalSubscribe(MessageType.PSubscribe, ref prefix, userContext, serialNo);

/// <summary>
/// Flush current buffer of outgoing messages. Does not wait for responses.
Expand Down Expand Up @@ -307,6 +340,7 @@ public void Dispose(bool completePending)


int lastSeqNo = -1;
readonly Dictionary<int, (Key, Value, Context)> pubsubPendingContext = new();
readonly Dictionary<int, (Key, Input, Output, Context)> readRmwPendingContext = new();
readonly Dictionary<int, TaskCompletionSource<(Status, Output)>> readRmwPendingTcs = new();

Expand Down Expand Up @@ -481,6 +515,56 @@ internal void ProcessReplies(byte[] buf, int offset)
}
break;
}
case MessageType.Publish:
{
var status = ReadStatus(ref src);
(Key, Value, Context) result = upsertQueue.Dequeue();
functions.PublishCompletionCallback(ref result.Item1, ref result.Item2, result.Item3);
break;
}
case MessageType.Subscribe:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
pubsubPendingContext.TryGetValue(p, out var result);
result.Item2 = serializer.ReadValue(ref src);
functions.SubscribeCallback(ref result.Item1, ref result.Item2, result.Item3);
}
else if (status == Status.PENDING)
{
var result = pubsubQueue.Dequeue();
pubsubPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PSubscribe:
{
var status = ReadStatus(ref src);
var p = hrw.ReadPendingSeqNo(ref src);
if (status == Status.OK)
{
pubsubPendingContext.TryGetValue(p, out var result);
result.Item1 = serializer.ReadKey(ref src);
result.Item2 = serializer.ReadValue(ref src);
functions.SubscribeCallback(ref result.Item1, ref result.Item2, result.Item3);
}
else if (status == Status.PENDING)
{
var result = pubsubQueue.Dequeue();
pubsubPendingContext.Add(p, result);
}
else
{
throw new Exception("Unexpected status of SubscribeKV");
}
break;
}
case MessageType.PendingResult:
{
HandlePending(ref src);
Expand Down Expand Up @@ -667,12 +751,58 @@ private unsafe Status InternalSubscribeKV(MessageType messageType, ref Key key,
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, default, userContext));
return Status.PENDING;
}
Flush();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalPublish(MessageType messageType, ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0)
{
Debug.Assert(!subscriptionSession);

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref desiredValue, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
upsertQueue.Enqueue((key, desiredValue, userContext));
return Status.PENDING;
}
Flush();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private unsafe Status InternalSubscribe(MessageType messageType, ref Key key, Context userContext = default, long serialNo = 0)
{
subscriptionSession = true;

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, default, userContext));
pubsubQueue.Enqueue((key, default, userContext));
return Status.PENDING;
}
Flush();
Expand Down
8 changes: 8 additions & 0 deletions cs/remote/src/FASTER.client/FixedLenSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ public Key ReadKey(ref byte* src)
return Unsafe.AsRef<Key>(_src);
}

/// <inheritdoc />
public Value ReadValue(ref byte* src)
{
var _src = src;
src += Unsafe.SizeOf<Value>();
return Unsafe.AsRef<Value>(_src);
}

/// <inheritdoc />
public Output ReadOutput(ref byte* src)
{
Expand Down
16 changes: 16 additions & 0 deletions cs/remote/src/FASTER.client/ICallbackFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,21 @@ public interface ICallbackFunctions<Key, Value, Input, Output, Context>
/// <param name="ctx"></param>
/// <param name="status"></param>
void SubscribeKVCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status);

/// <summary>
/// Publish completion
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="ctx"></param>
void PublishCompletionCallback(ref Key key, ref Value value, Context ctx);

/// <summary>
/// Subscribe callback
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="ctx"></param>
void SubscribeCallback(ref Key key, ref Value value, Context ctx);
}
}
4 changes: 4 additions & 0 deletions cs/remote/src/FASTER.client/MemoryFunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ public virtual void RMWCompletionCallback(ref ReadOnlyMemory<T> key, ref ReadOnl
public virtual void UpsertCompletionCallback(ref ReadOnlyMemory<T> key, ref ReadOnlyMemory<T> value, byte ctx) { }
/// <inheritdoc />
public virtual void SubscribeKVCallback(ref ReadOnlyMemory<T> key, ref ReadOnlyMemory<T> input, ref (IMemoryOwner<T>, int) output, byte ctx, Status status) { }
/// <inheritdoc/>
public virtual void PublishCompletionCallback(ref ReadOnlyMemory<T> key, ref ReadOnlyMemory<T> value, byte ctx) { }
/// <inheritdoc/>
public virtual void SubscribeCallback(ref ReadOnlyMemory<T> key, ref ReadOnlyMemory<T> value, byte ctx) { }
}
}
11 changes: 11 additions & 0 deletions cs/remote/src/FASTER.client/MemoryParameterSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ public ReadOnlyMemory<T> ReadKey(ref byte* src)
return (mem.Memory.Slice(0, len));
}

/// <inheritdoc />
public ReadOnlyMemory<T> ReadValue(ref byte* src)
{
var len = (*(int*)src) / sizeof(T);
var mem = memoryPool.Rent(len);
new ReadOnlySpan<byte>(src + sizeof(int), (*(int*)src)).CopyTo(
MemoryMarshal.Cast<T, byte>(mem.Memory.Span));
src += sizeof(int) + (*(int*)src);
return (mem.Memory.Slice(0, len));
}

/// <inheritdoc />
public (IMemoryOwner<T>, int) ReadOutput(ref byte* src)
{
Expand Down
Loading