Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Websocket bug-fixes #663

Merged
merged 4 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cs/remote/samples/WebClient/FASTERFunctions.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ class FASTERFunctions extends CallbackFunctionsBase {
}

ReadCompletionCallback(keyBytes, outputBytes, status) {
if (status == Status.OK) {
if (status == Status.Found) {
var output = deserialize(outputBytes, 0, outputBytes.length);
writeToScreen("<span> value: " + output + " </span>");
}
}

UpsertCompletionCallback(keyBytes, valueBytes, status) {
if (status == Status.OK) {
if (status == Status.Found) {
writeToScreen("<span> PUT OK </span>");
}
}
Expand All @@ -29,7 +29,7 @@ class FASTERFunctions extends CallbackFunctionsBase {

SubscribeKVCompletionCallback(keyBytes, outputBytes, status)
{
if (status == Status.OK) {
if (status == Status.Found) {
var key = deserialize(keyBytes, 0, keyBytes.length);
var output = deserialize(outputBytes, 0, outputBytes.length);
writeToScreen("<span> subscribed key: " + key + " value: " + output + " </span>");
Expand All @@ -38,7 +38,7 @@ class FASTERFunctions extends CallbackFunctionsBase {

SubscribeCompletionCallback(keyBytes, valueBytes, status)
{
if (status == Status.OK) {
if (status == Status.Found) {
var key = deserialize(keyBytes, 0, keyBytes.length);
var value = deserialize(valueBytes, 0, valueBytes.length);
writeToScreen("<span> subscribed key: " + key + " value: " + value + " </span>");
Expand Down
22 changes: 11 additions & 11 deletions cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
switch (op) {
case MessageType.Read:
var key = this.readrmwQueue.dequeue();
if (status == Status.OK) {
if (status == Status.Found) {
output = this.serializer.ReadOutput(arrayBuf, arrIdx);
arrIdx += output.length + 4;
this.functions.ReadCompletionCallback(key, output, status);
break;
} else if (status == Status.PENDING) {
} else if (status == Status.IsPending) {
var p = this.intSerializer.deserialize(arrayBuf, arrIdx);
arrIdx += 4;
readrmwPendingContext[p] = key;
Expand All @@ -58,11 +58,11 @@

case MessageType.RMW:
var key = this.readrmwQueue.dequeue();
if (status == Status.OK || status == Status.NOTFOUND) {
if (status == Status.Found || status == Status.NotFound) {
output = this.serializer.ReadOutput(arrayBuf, arrIdx);
arrIdx += output.length + 4;
this.functions.RMWCompletionCallback(key, output, status);
} else if (status == Status.PENDING) {
} else if (status == Status.IsPending) {
var p = this.intSerializer.deserialize(arrayBuf, arrIdx);
arrIdx += 4;
readrmwPendingContext[p] = key;
Expand All @@ -75,12 +75,12 @@
case MessageType.SubscribeKV:
var sid = this.intSerializer.deserialize(arrayBuf, arrIdx);
arrIdx += 4;
if (status == Status.OK || status == Status.NOTFOUND) {
if (status == Status.Found || status == Status.NotFound) {
var key = this.readrmwPendingContext[sid];
output = this.serializer.ReadOutput(arrayBuf, arrIdx);
arrIdx += output.length + 4;
this.functions.SubscribeKVCompletionCallback(key, output, status);
} else if (status == Status.PENDING) {
} else if (status == Status.IsPending) {
var key = this.readrmwQueue.dequeue();
this.readrmwPendingContext[sid] = key;
} else {
Expand All @@ -93,12 +93,12 @@
case MessageType.Subscribe:
var sid = this.intSerializer.deserialize(arrayBuf, arrIdx);
arrIdx += 4;
if (status == Status.OK || status == Status.NOTFOUND) {
if (status == Status.Found || status == Status.NotFound) {
var key = this.pubsubPendingContext[sid];
output = this.serializer.ReadOutput(arrayBuf, arrIdx);
arrIdx += output.length + 4;
this.functions.SubscribeCompletionCallback(key, output, status);
} else if (status == Status.PENDING) {
} else if (status == Status.IsPending) {
var key = this.pubsubQueue.dequeue();
this.pubsubPendingContext[sid] = key;
} else {
Expand Down Expand Up @@ -130,7 +130,7 @@
case MessageType.Read:
var key = this.readrmwPendingContext[p];
delete this.readrmwPendingContext[p];
if (status == Status.OK) {
if (status == Status.Found) {
output = this.serializer.ReadOutput(arrayBuf, arrIdx);
arrIdx += output.length + 4;
}
Expand All @@ -140,7 +140,7 @@
case MessageType.RMW:
var key = this.readrmwPendingContext[p];
delete this.readrmwPendingContext[p];
if (status == Status.OK) {
if (status == Status.Found) {
output = this.serializer.ReadOutput(arrayBuf, arrIdx);
arrIdx += output.length + 4;
}
Expand All @@ -149,7 +149,7 @@

case MessageType.SubscribeKV:
var key = this.readrmwPendingContext[p];
if (status == Status.OK) {
if (status == Status.Found) {
output = this.serializer.ReadOutput(arrayBuf, arrIdx);
arrIdx += output.length + 4;
}
Expand Down
8 changes: 4 additions & 4 deletions cs/remote/src/FASTER.client/JavascriptClient/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class JSUtils
}

const Status = {
OK: 0,
NOTFOUND: 1,
PENDING: 2,
ERROR: 3
Found: 0,
NotFound: 1,
IsPending: 2,
IsFaulted: 3
};

const MessageType = {
Expand Down
39 changes: 22 additions & 17 deletions cs/remote/src/FASTER.server/WebsocketServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,33 +273,28 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)
}

var nextBufOffset = offset;
nextBufOffset += msglen;

while (fin == false)
{
nextBufOffset += msglen;

fin = ((buf[nextBufOffset]) & 0b10000000) != 0;
fin = (buf[nextBufOffset] & 0b10000000) != 0;

nextBufOffset++;
var nextMsgLen = buf[nextBufOffset] - 128; // & 0111 1111
int nextMsgLen = buf[nextBufOffset] & 0b01111111;

offset++;
nextBufOffset++;

if (nextMsgLen < 125)
{
nextBufOffset++;
offset++;
}
else if (nextMsgLen == 126)
{
offset += 3;
nextMsgLen = BitConverter.ToUInt16(new byte[] { buf[nextBufOffset + 1], buf[nextBufOffset] }, 0);
nextBufOffset += 2;
}
else if (nextMsgLen == 127)
{
offset += 9;
nextMsgLen = (int)BitConverter.ToUInt64(new byte[] { buf[nextBufOffset + 7], buf[nextBufOffset + 6], buf[nextBufOffset + 5], buf[nextBufOffset + 4], buf[nextBufOffset + 3], buf[nextBufOffset + 2], buf[nextBufOffset + 1], buf[nextBufOffset] }, 0);
nextBufOffset += 8;
}
Expand All @@ -311,10 +306,18 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)
dataStart = nextBufOffset + 4
};
decoderInfoList.Add(nextDecoderInfo);
totalMsgLen += nextMsgLen;
offset += 4;
totalMsgLen += nextMsgLen; // Message length without the mask
nextBufOffset += 4; // 4 bytes of masking
nextBufOffset += nextMsgLen; // remaining message length }
}

if (msglen == 2)
{
this.Dispose();
return false;
}

offset = nextBufOffset;
completeWSCommand = true;

var decodedIndex = 0;
Expand All @@ -330,7 +333,6 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)
}
}

offset += totalMsgLen;
readHead = offset;
}

Expand Down Expand Up @@ -573,7 +575,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r

byte* d = networkSender.GetResponseObjectHead();
var dend = networkSender.GetResponseObjectTail();
var dcurr = d + sizeof(int); // reserve space for size
var dcurr = d + 10 + sizeof(int); // reserve space for websocket header and size
byte* outputDcurr;

dcurr += BatchHeader.Size;
Expand Down Expand Up @@ -611,13 +613,16 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
}

// Send replies
var dstart = d + sizeof(int);
var dtemp = d + 10;
var dstart = dtemp + sizeof(int);
Unsafe.AsRef<BatchHeader>(dstart).NumMessages = 1;
Unsafe.AsRef<BatchHeader>(dstart).SeqNo = 0;
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)networkSender.GetResponseObjectHead() = -(payloadSize - sizeof(int));
networkSender.SendResponse(0, payloadSize);
int packetLen = (int)((dcurr - 10) - d);

CreateSendPacketHeader(ref d, packetLen);

*(int*)dtemp = (packetLen - sizeof(int));
networkSender.SendResponse((int) (d - networkSender.GetResponseObjectHead()), (int)(dcurr - d));
}


Expand Down