Skip to content

Commit

Permalink
[C#] Fixes to non-kv PubSub (#568)
Browse files Browse the repository at this point in the history
* Removed multiple enqueues for SubscribeBroker during Publish() and replaced with just one. Disadvantage is that there are multiple small allocations happening

* Fixed nit that was causing valPtr to send wrong values when there were multiple subscribers

* Fixed couple of nits.
  • Loading branch information
rohankadekodi-msr authored Oct 13, 2021
1 parent 28f5494 commit d8bee4b
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions cs/remote/src/FASTER.server/PubSub/SubscribeBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private unsafe int Broadcast(byte[] key, byte* valPtr, int valLength, bool ascii
{
byte* keyBytePtr = ptr;
byte* nullBytePtr = null;
sub.Value.Publish(ref keyBytePtr, key.Length, ref valPtr, valLength, ref nullBytePtr, sub.Key);
byte* valBytePtr = valPtr;
sub.Value.Publish(ref keyBytePtr, key.Length, ref valBytePtr, valLength, ref nullBytePtr, sub.Key);
numSubscribers++;
}
}
Expand Down Expand Up @@ -142,18 +143,35 @@ private async Task Start(CancellationToken cancellationToken = default)

using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
await iter.WaitAsync(cancellationToken);
while (iter.GetNext(out byte[] subscriptionKey, out _, out _, out _))
while (iter.GetNext(out byte[] subscriptionKeyValueAscii, out _, out long currentAddress, out long nextAddress))
{
if (!iter.GetNext(out byte[] subscriptionValue, out _, out long currentAddress, out long nextAddress))
{
if (currentAddress >= long.MaxValue) return;
}
if (!iter.GetNext(out byte[] ascii, out _, out currentAddress, out nextAddress))
if (currentAddress >= long.MaxValue) return;

byte[] subscriptionKey;
byte[] subscriptionValue;
byte[] ascii;

unsafe
{
if (currentAddress >= long.MaxValue) return;
fixed (byte* subscriptionKeyValueAsciiPtr = &subscriptionKeyValueAscii[0])
{
int subscriptionKeyLength = *(int*)subscriptionKeyValueAsciiPtr + sizeof(int);
int subscriptionValueLength = subscriptionKeyValueAscii.Length - (subscriptionKeyLength + sizeof(bool));
subscriptionKey = new byte[subscriptionKeyLength];
subscriptionValue = new byte[subscriptionValueLength];
ascii = new byte[sizeof(bool)];

fixed (byte* subscriptionKeyPtr = &subscriptionKey[0], subscriptionValuePtr = &subscriptionValue[0], asciiPtr = &ascii[0])
{
Buffer.MemoryCopy(subscriptionKeyValueAsciiPtr, subscriptionKeyPtr, subscriptionKeyLength, subscriptionKeyLength);
Buffer.MemoryCopy(subscriptionKeyValueAsciiPtr + subscriptionKeyLength, subscriptionValuePtr, subscriptionValueLength, subscriptionValueLength);
Buffer.MemoryCopy(subscriptionKeyValueAsciiPtr + subscriptionKeyLength + subscriptionValueLength, asciiPtr, sizeof(bool), sizeof(bool));
}
}
}
truncateUntilAddress = nextAddress;
uniqueKeys.Add(subscriptionKey, (subscriptionValue, ascii));
if (!uniqueKeys.ContainsKey(subscriptionKey))
uniqueKeys.Add(subscriptionKey, (subscriptionValue, ascii));
}

if (truncateUntilAddress > log.BeginAddress)
Expand Down Expand Up @@ -362,9 +380,19 @@ public unsafe void Publish(byte* key, byte* value, int valueLength, bool ascii =
var start = key;
ref Key k = ref keySerializer.ReadKeyByRef(ref key);
// TODO: this needs to be a single atomic enqueue
log.Enqueue(new Span<byte>(start, (int)(key - start)));
log.Enqueue(new Span<byte>(value, valueLength));
log.Enqueue(new Span<byte>(Unsafe.AsPointer(ref ascii), sizeof(bool)));
byte[] logEntryBytes = new byte[(key - start) + valueLength + sizeof(bool)];
fixed (byte* logEntryBytePtr = &logEntryBytes[0])
{
byte* dst = logEntryBytePtr;
Buffer.MemoryCopy(start, dst, (key - start), (key - start));
dst += (key - start);
Buffer.MemoryCopy(value, dst, valueLength, valueLength);
dst += valueLength;
byte* asciiPtr = (byte*)&ascii;
Buffer.MemoryCopy(asciiPtr, dst, sizeof(bool), sizeof(bool));
}

log.Enqueue(logEntryBytes);
log.RefreshUncommitted();
}

Expand Down

0 comments on commit d8bee4b

Please sign in to comment.