Skip to content

Commit

Permalink
[C#] bug-fix for publish in websockets (#649)
Browse files Browse the repository at this point in the history
* Sending publish with websocket headers for websocket server session
* Handling clean disconnection
* Fixed a nit for large images transfer that was causing problems
* simplified code a bit and fixed a nit

Co-authored-by: Rohan Kadekodi <kadekodirohan@gmail.com>
  • Loading branch information
rohankadekodi-msr and rohankadekodi authored Feb 21, 2022
1 parent 2509672 commit eea90e6
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions cs/remote/src/FASTER.server/WebsocketServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,33 +275,28 @@ private unsafe bool ProcessBatch(byte[] buf, int offset)
}

var nextBufOffset = offset;
nextBufOffset += msglen;

while (fin == false)
{
nextBufOffset += msglen;

fin = ((buf[nextBufOffset]) & 0b10000000) != 0;
fin = (buf[nextBufOffset] & 0b10000000) != 0;

nextBufOffset++;
var nextMsgLen = buf[nextBufOffset] - 128; // & 0111 1111
int nextMsgLen = buf[nextBufOffset] & 0b01111111; // & 0111 1111

offset++;
nextBufOffset++;

if (nextMsgLen < 125)
{
nextBufOffset++;
offset++;
}
else if (nextMsgLen == 126)
{
offset += 3;
nextMsgLen = BitConverter.ToUInt16(new byte[] { buf[nextBufOffset + 1], buf[nextBufOffset] }, 0);
nextBufOffset += 2;
}
else if (nextMsgLen == 127)
{
offset += 9;
nextMsgLen = (int)BitConverter.ToUInt64(new byte[] { buf[nextBufOffset + 7], buf[nextBufOffset + 6], buf[nextBufOffset + 5], buf[nextBufOffset + 4], buf[nextBufOffset + 3], buf[nextBufOffset + 2], buf[nextBufOffset + 1], buf[nextBufOffset] }, 0);
nextBufOffset += 8;
}
Expand All @@ -311,10 +306,18 @@ private unsafe bool ProcessBatch(byte[] buf, int offset)
nextDecoderInfo.maskStart = nextBufOffset;
nextDecoderInfo.dataStart = nextBufOffset + 4;
decoderInfoList.Add(nextDecoderInfo);
totalMsgLen += nextMsgLen;
offset += 4;
totalMsgLen += nextMsgLen; // Message length without the mask
nextBufOffset += 4; // 4 bytes of masking
nextBufOffset += nextMsgLen; // remaining message length
}

if (msglen == 2)
{
this.Dispose();
return false;
}

offset = nextBufOffset;
completeWSCommand = true;

var decodedIndex = 0;
Expand All @@ -330,7 +333,7 @@ private unsafe bool ProcessBatch(byte[] buf, int offset)
}
}

offset += totalMsgLen;
//offset += totalMsgLen;
readHead = offset;
}

Expand Down Expand Up @@ -575,7 +578,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r

byte* d = respObj.bufferPtr;
var dend = d + respObj.buffer.Length;
var dcurr = d + sizeof(int); // reserve space for size
var dcurr = d + 10 + sizeof(int); // reserve space for websocket header and size
byte* outputDcurr;

dcurr += BatchHeader.Size;
Expand Down Expand Up @@ -613,15 +616,18 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
}

// Send replies
var dstart = d + sizeof(int);
var dtemp = d + 10;
var dstart = dtemp + sizeof(int);
Unsafe.AsRef<BatchHeader>(dstart).NumMessages = 1;
Unsafe.AsRef<BatchHeader>(dstart).SeqNo = 0;
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)respObj.bufferPtr = -(payloadSize - sizeof(int));
int packetLen = (int)((dcurr - 10) - d);

CreateSendPacketHeader(ref d, packetLen);

*(int*)dtemp = (packetLen - sizeof(int));
try
{
messageManager.Send(socket, respObj, 0, payloadSize);
messageManager.Send(socket, respObj, (int)(d - respObj.bufferPtr), (int)(dcurr - d));
}
catch
{
Expand Down

0 comments on commit eea90e6

Please sign in to comment.