From adab0f1a4312273973430956c3af4290e1ad9e4d Mon Sep 17 00:00:00 2001 From: Rohan Kadekodi Date: Tue, 22 Feb 2022 20:28:32 -0600 Subject: [PATCH 1/4] Fixed nits in websockets: * Clean disconnection * Support correction for large packets --- .../FASTER.server/WebsocketServerSession.cs | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/cs/remote/src/FASTER.server/WebsocketServerSession.cs b/cs/remote/src/FASTER.server/WebsocketServerSession.cs index db71f8929..43f416689 100644 --- a/cs/remote/src/FASTER.server/WebsocketServerSession.cs +++ b/cs/remote/src/FASTER.server/WebsocketServerSession.cs @@ -273,33 +273,28 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset) } var nextBufOffset = offset; + nextBufOffset += msglen; while (fin == false) { - nextBufOffset += msglen; - - fin = ((buf[nextBufOffset]) & 0b10000000) != 0; + fin = (buf[nextBufOffset] & 0b10000000) != 0; nextBufOffset++; - var nextMsgLen = buf[nextBufOffset] - 128; // & 0111 1111 + int nextMsgLen = buf[nextBufOffset] & 0b01111111; - offset++; nextBufOffset++; if (nextMsgLen < 125) { nextBufOffset++; - offset++; } else if (nextMsgLen == 126) { - offset += 3; nextMsgLen = BitConverter.ToUInt16(new byte[] { buf[nextBufOffset + 1], buf[nextBufOffset] }, 0); nextBufOffset += 2; } else if (nextMsgLen == 127) { - offset += 9; nextMsgLen = (int)BitConverter.ToUInt64(new byte[] { buf[nextBufOffset + 7], buf[nextBufOffset + 6], buf[nextBufOffset + 5], buf[nextBufOffset + 4], buf[nextBufOffset + 3], buf[nextBufOffset + 2], buf[nextBufOffset + 1], buf[nextBufOffset] }, 0); nextBufOffset += 8; } @@ -311,10 +306,17 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset) dataStart = nextBufOffset + 4 }; decoderInfoList.Add(nextDecoderInfo); - totalMsgLen += nextMsgLen; - offset += 4; + nextBufOffset += 4; // 4 bytes of masking + nextBufOffset += nextMsgLen; // remaining message length } + } + + if (msglen == 2) + { + this.Dispose(); + return false; } + offset = nextBufOffset; completeWSCommand = true; var decodedIndex = 0; @@ -330,7 +332,6 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset) } } - offset += totalMsgLen; readHead = offset; } @@ -573,7 +574,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r byte* d = networkSender.GetResponseObjectHead(); var dend = networkSender.GetResponseObjectTail(); - var dcurr = d + sizeof(int); // reserve space for size + var dcurr = d + 10 + sizeof(int); // reserve space for websocket header and size byte* outputDcurr; dcurr += BatchHeader.Size; @@ -611,13 +612,16 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r } // Send replies - var dstart = d + sizeof(int); + var dtemp = d + 10; + var dstart = dtemp + sizeof(int); Unsafe.AsRef(dstart).NumMessages = 1; Unsafe.AsRef(dstart).SeqNo = 0; - int payloadSize = (int)(dcurr - d); - // Set packet size in header - *(int*)networkSender.GetResponseObjectHead() = -(payloadSize - sizeof(int)); - networkSender.SendResponse(0, payloadSize); + int packetLen = (int)((dcurr - 10) - d); + + CreateSendPacketHeader(ref d, packetLen); + + *(int*)dtemp = (packetLen - sizeof(int)); + networkSender.SendResponse((int) (d - networkSender.GetResponseObjectHead()), (int)(dcurr - d)); } From 1b5ffeea8bceac5a1087b438211b02cec70d9227 Mon Sep 17 00:00:00 2001 From: Rohan Kadekodi Date: Tue, 22 Feb 2022 20:35:48 -0600 Subject: [PATCH 2/4] Small nit for correcting message length --- cs/remote/src/FASTER.server/WebsocketServerSession.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/cs/remote/src/FASTER.server/WebsocketServerSession.cs b/cs/remote/src/FASTER.server/WebsocketServerSession.cs index 43f416689..845a7e8ff 100644 --- a/cs/remote/src/FASTER.server/WebsocketServerSession.cs +++ b/cs/remote/src/FASTER.server/WebsocketServerSession.cs @@ -306,6 +306,7 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset) dataStart = nextBufOffset + 4 }; decoderInfoList.Add(nextDecoderInfo); + totalMsgLen += nextMsgLen; // Message length without the mask nextBufOffset += 4; // 4 bytes of masking nextBufOffset += nextMsgLen; // remaining message length } } From 13e3610ae53740157c45863805c7eaae78702ef4 Mon Sep 17 00:00:00 2001 From: Rohan Kadekodi Date: Tue, 22 Feb 2022 22:09:56 -0600 Subject: [PATCH 3/4] changed status to new FASTER status codes --- cs/remote/samples/WebClient/FASTERFunctions.js | 8 ++++---- .../JavascriptClient/ClientSession.js | 14 +++++++------- .../src/FASTER.client/JavascriptClient/Utils.js | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cs/remote/samples/WebClient/FASTERFunctions.js b/cs/remote/samples/WebClient/FASTERFunctions.js index 6ee2501a3..00fa81858 100644 --- a/cs/remote/samples/WebClient/FASTERFunctions.js +++ b/cs/remote/samples/WebClient/FASTERFunctions.js @@ -11,14 +11,14 @@ class FASTERFunctions extends CallbackFunctionsBase { } ReadCompletionCallback(keyBytes, outputBytes, status) { - if (status == Status.OK) { + if (status == Status.FOUND) { var output = deserialize(outputBytes, 0, outputBytes.length); writeToScreen(" value: " + output + " "); } } UpsertCompletionCallback(keyBytes, valueBytes, status) { - if (status == Status.OK) { + if (status == Status.FOUND) { writeToScreen(" PUT OK "); } } @@ -29,7 +29,7 @@ class FASTERFunctions extends CallbackFunctionsBase { SubscribeKVCompletionCallback(keyBytes, outputBytes, status) { - if (status == Status.OK) { + if (status == Status.FOUND) { var key = deserialize(keyBytes, 0, keyBytes.length); var output = deserialize(outputBytes, 0, outputBytes.length); writeToScreen(" subscribed key: " + key + " value: " + output + " "); @@ -38,7 +38,7 @@ class FASTERFunctions extends CallbackFunctionsBase { SubscribeCompletionCallback(keyBytes, valueBytes, status) { - if (status == Status.OK) { + if (status == Status.FOUND) { var key = deserialize(keyBytes, 0, keyBytes.length); var value = deserialize(valueBytes, 0, valueBytes.length); writeToScreen(" subscribed key: " + key + " value: " + value + " "); diff --git a/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js b/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js index 2e6270fb3..dcf83f255 100644 --- a/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js +++ b/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js @@ -32,7 +32,7 @@ switch (op) { case MessageType.Read: var key = this.readrmwQueue.dequeue(); - if (status == Status.OK) { + if (status == Status.FOUND) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; this.functions.ReadCompletionCallback(key, output, status); @@ -58,7 +58,7 @@ case MessageType.RMW: var key = this.readrmwQueue.dequeue(); - if (status == Status.OK || status == Status.NOTFOUND) { + if (status == Status.FOUND || status == Status.NOTFOUND) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; this.functions.RMWCompletionCallback(key, output, status); @@ -75,7 +75,7 @@ case MessageType.SubscribeKV: var sid = this.intSerializer.deserialize(arrayBuf, arrIdx); arrIdx += 4; - if (status == Status.OK || status == Status.NOTFOUND) { + if (status == Status.FOUND || status == Status.NOTFOUND) { var key = this.readrmwPendingContext[sid]; output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; @@ -93,7 +93,7 @@ case MessageType.Subscribe: var sid = this.intSerializer.deserialize(arrayBuf, arrIdx); arrIdx += 4; - if (status == Status.OK || status == Status.NOTFOUND) { + if (status == Status.FOUND || status == Status.NOTFOUND) { var key = this.pubsubPendingContext[sid]; output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; @@ -130,7 +130,7 @@ case MessageType.Read: var key = this.readrmwPendingContext[p]; delete this.readrmwPendingContext[p]; - if (status == Status.OK) { + if (status == Status.FOUND) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; } @@ -140,7 +140,7 @@ case MessageType.RMW: var key = this.readrmwPendingContext[p]; delete this.readrmwPendingContext[p]; - if (status == Status.OK) { + if (status == Status.FOUND) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; } @@ -149,7 +149,7 @@ case MessageType.SubscribeKV: var key = this.readrmwPendingContext[p]; - if (status == Status.OK) { + if (status == Status.FOUND) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; } diff --git a/cs/remote/src/FASTER.client/JavascriptClient/Utils.js b/cs/remote/src/FASTER.client/JavascriptClient/Utils.js index af9622ea0..4a2f15992 100644 --- a/cs/remote/src/FASTER.client/JavascriptClient/Utils.js +++ b/cs/remote/src/FASTER.client/JavascriptClient/Utils.js @@ -27,7 +27,7 @@ class JSUtils } const Status = { - OK: 0, + FOUND: 0, NOTFOUND: 1, PENDING: 2, ERROR: 3 From d833f9ecbb097ba74a9cc9d69447b043f508b079 Mon Sep 17 00:00:00 2001 From: Rohan Kadekodi Date: Thu, 24 Feb 2022 23:35:35 -0600 Subject: [PATCH 4/4] Modified status codes --- .../samples/WebClient/FASTERFunctions.js | 8 +++---- .../JavascriptClient/ClientSession.js | 22 +++++++++---------- .../FASTER.client/JavascriptClient/Utils.js | 8 +++---- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cs/remote/samples/WebClient/FASTERFunctions.js b/cs/remote/samples/WebClient/FASTERFunctions.js index 00fa81858..c67c91c08 100644 --- a/cs/remote/samples/WebClient/FASTERFunctions.js +++ b/cs/remote/samples/WebClient/FASTERFunctions.js @@ -11,14 +11,14 @@ class FASTERFunctions extends CallbackFunctionsBase { } ReadCompletionCallback(keyBytes, outputBytes, status) { - if (status == Status.FOUND) { + if (status == Status.Found) { var output = deserialize(outputBytes, 0, outputBytes.length); writeToScreen(" value: " + output + " "); } } UpsertCompletionCallback(keyBytes, valueBytes, status) { - if (status == Status.FOUND) { + if (status == Status.Found) { writeToScreen(" PUT OK "); } } @@ -29,7 +29,7 @@ class FASTERFunctions extends CallbackFunctionsBase { SubscribeKVCompletionCallback(keyBytes, outputBytes, status) { - if (status == Status.FOUND) { + if (status == Status.Found) { var key = deserialize(keyBytes, 0, keyBytes.length); var output = deserialize(outputBytes, 0, outputBytes.length); writeToScreen(" subscribed key: " + key + " value: " + output + " "); @@ -38,7 +38,7 @@ class FASTERFunctions extends CallbackFunctionsBase { SubscribeCompletionCallback(keyBytes, valueBytes, status) { - if (status == Status.FOUND) { + if (status == Status.Found) { var key = deserialize(keyBytes, 0, keyBytes.length); var value = deserialize(valueBytes, 0, valueBytes.length); writeToScreen(" subscribed key: " + key + " value: " + value + " "); diff --git a/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js b/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js index dcf83f255..73a8ef1cf 100644 --- a/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js +++ b/cs/remote/src/FASTER.client/JavascriptClient/ClientSession.js @@ -32,12 +32,12 @@ switch (op) { case MessageType.Read: var key = this.readrmwQueue.dequeue(); - if (status == Status.FOUND) { + if (status == Status.Found) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; this.functions.ReadCompletionCallback(key, output, status); break; - } else if (status == Status.PENDING) { + } else if (status == Status.IsPending) { var p = this.intSerializer.deserialize(arrayBuf, arrIdx); arrIdx += 4; readrmwPendingContext[p] = key; @@ -58,11 +58,11 @@ case MessageType.RMW: var key = this.readrmwQueue.dequeue(); - if (status == Status.FOUND || status == Status.NOTFOUND) { + if (status == Status.Found || status == Status.NotFound) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; this.functions.RMWCompletionCallback(key, output, status); - } else if (status == Status.PENDING) { + } else if (status == Status.IsPending) { var p = this.intSerializer.deserialize(arrayBuf, arrIdx); arrIdx += 4; readrmwPendingContext[p] = key; @@ -75,12 +75,12 @@ case MessageType.SubscribeKV: var sid = this.intSerializer.deserialize(arrayBuf, arrIdx); arrIdx += 4; - if (status == Status.FOUND || status == Status.NOTFOUND) { + if (status == Status.Found || status == Status.NotFound) { var key = this.readrmwPendingContext[sid]; output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; this.functions.SubscribeKVCompletionCallback(key, output, status); - } else if (status == Status.PENDING) { + } else if (status == Status.IsPending) { var key = this.readrmwQueue.dequeue(); this.readrmwPendingContext[sid] = key; } else { @@ -93,12 +93,12 @@ case MessageType.Subscribe: var sid = this.intSerializer.deserialize(arrayBuf, arrIdx); arrIdx += 4; - if (status == Status.FOUND || status == Status.NOTFOUND) { + if (status == Status.Found || status == Status.NotFound) { var key = this.pubsubPendingContext[sid]; output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; this.functions.SubscribeCompletionCallback(key, output, status); - } else if (status == Status.PENDING) { + } else if (status == Status.IsPending) { var key = this.pubsubQueue.dequeue(); this.pubsubPendingContext[sid] = key; } else { @@ -130,7 +130,7 @@ case MessageType.Read: var key = this.readrmwPendingContext[p]; delete this.readrmwPendingContext[p]; - if (status == Status.FOUND) { + if (status == Status.Found) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; } @@ -140,7 +140,7 @@ case MessageType.RMW: var key = this.readrmwPendingContext[p]; delete this.readrmwPendingContext[p]; - if (status == Status.FOUND) { + if (status == Status.Found) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; } @@ -149,7 +149,7 @@ case MessageType.SubscribeKV: var key = this.readrmwPendingContext[p]; - if (status == Status.FOUND) { + if (status == Status.Found) { output = this.serializer.ReadOutput(arrayBuf, arrIdx); arrIdx += output.length + 4; } diff --git a/cs/remote/src/FASTER.client/JavascriptClient/Utils.js b/cs/remote/src/FASTER.client/JavascriptClient/Utils.js index 4a2f15992..370d0290e 100644 --- a/cs/remote/src/FASTER.client/JavascriptClient/Utils.js +++ b/cs/remote/src/FASTER.client/JavascriptClient/Utils.js @@ -27,10 +27,10 @@ class JSUtils } const Status = { - FOUND: 0, - NOTFOUND: 1, - PENDING: 2, - ERROR: 3 + Found: 0, + NotFound: 1, + IsPending: 2, + IsFaulted: 3 }; const MessageType = {