From eea90e63256c14c27f420594238332168bc000c4 Mon Sep 17 00:00:00 2001 From: rohankadekodi-msr <69916400+rohankadekodi-msr@users.noreply.github.com> Date: Mon, 21 Feb 2022 17:20:44 -0600 Subject: [PATCH] [C#] bug-fix for publish in websockets (#649) * Sending publish with websocket headers for websocket server session * Handling clean disconnection * Fixed a nit for large images transfer that was causing problems * simplified code a bit and fixed a nit Co-authored-by: Rohan Kadekodi --- .../FASTER.server/WebsocketServerSession.cs | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/cs/remote/src/FASTER.server/WebsocketServerSession.cs b/cs/remote/src/FASTER.server/WebsocketServerSession.cs index e8edce724..a8700cbec 100644 --- a/cs/remote/src/FASTER.server/WebsocketServerSession.cs +++ b/cs/remote/src/FASTER.server/WebsocketServerSession.cs @@ -275,33 +275,28 @@ private unsafe bool ProcessBatch(byte[] buf, int offset) } var nextBufOffset = offset; + nextBufOffset += msglen; while (fin == false) { - nextBufOffset += msglen; - - fin = ((buf[nextBufOffset]) & 0b10000000) != 0; + fin = (buf[nextBufOffset] & 0b10000000) != 0; nextBufOffset++; - var nextMsgLen = buf[nextBufOffset] - 128; // & 0111 1111 + int nextMsgLen = buf[nextBufOffset] & 0b01111111; // & 0111 1111 - offset++; nextBufOffset++; if (nextMsgLen < 125) { nextBufOffset++; - offset++; } else if (nextMsgLen == 126) { - offset += 3; nextMsgLen = BitConverter.ToUInt16(new byte[] { buf[nextBufOffset + 1], buf[nextBufOffset] }, 0); nextBufOffset += 2; } else if (nextMsgLen == 127) { - offset += 9; nextMsgLen = (int)BitConverter.ToUInt64(new byte[] { buf[nextBufOffset + 7], buf[nextBufOffset + 6], buf[nextBufOffset + 5], buf[nextBufOffset + 4], buf[nextBufOffset + 3], buf[nextBufOffset + 2], buf[nextBufOffset + 1], buf[nextBufOffset] }, 0); nextBufOffset += 8; } @@ -311,10 +306,18 @@ private unsafe bool ProcessBatch(byte[] buf, int offset) nextDecoderInfo.maskStart = nextBufOffset; nextDecoderInfo.dataStart = nextBufOffset + 4; decoderInfoList.Add(nextDecoderInfo); - totalMsgLen += nextMsgLen; - offset += 4; + totalMsgLen += nextMsgLen; // Message length without the mask + nextBufOffset += 4; // 4 bytes of masking + nextBufOffset += nextMsgLen; // remaining message length + } + + if (msglen == 2) + { + this.Dispose(); + return false; } + offset = nextBufOffset; completeWSCommand = true; var decodedIndex = 0; @@ -330,7 +333,7 @@ private unsafe bool ProcessBatch(byte[] buf, int offset) } } - offset += totalMsgLen; + //offset += totalMsgLen; readHead = offset; } @@ -575,7 +578,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r byte* d = respObj.bufferPtr; var dend = d + respObj.buffer.Length; - var dcurr = d + sizeof(int); // reserve space for size + var dcurr = d + 10 + sizeof(int); // reserve space for websocket header and size byte* outputDcurr; dcurr += BatchHeader.Size; @@ -613,15 +616,18 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r } // Send replies - var dstart = d + sizeof(int); + var dtemp = d + 10; + var dstart = dtemp + sizeof(int); Unsafe.AsRef(dstart).NumMessages = 1; Unsafe.AsRef(dstart).SeqNo = 0; - int payloadSize = (int)(dcurr - d); - // Set packet size in header - *(int*)respObj.bufferPtr = -(payloadSize - sizeof(int)); + int packetLen = (int)((dcurr - 10) - d); + + CreateSendPacketHeader(ref d, packetLen); + + *(int*)dtemp = (packetLen - sizeof(int)); try { - messageManager.Send(socket, respObj, 0, payloadSize); + messageManager.Send(socket, respObj, (int)(d - respObj.bufferPtr), (int)(dcurr - d)); } catch {