Skip to content

Commit

Permalink
fix: Made message merger thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
TwoTenPvP committed Sep 13, 2019
1 parent 49b0817 commit 435ce17
Showing 1 changed file with 92 additions and 78 deletions.
170 changes: 92 additions & 78 deletions Ruffles/Messaging/MessageMerger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ internal class MessageMerger
private ushort _headerBytes;
private ushort _packets;

private readonly object _lock = new object();

internal MessageMerger(int maxSize, ulong flushDelay)
{
_buffer = new byte[maxSize];
Expand All @@ -25,65 +27,74 @@ internal MessageMerger(int maxSize, ulong flushDelay)

internal void Clear()
{
_lastFlushTime = DateTime.Now;
_position = 1;
_headerBytes = 1;
_packets = 0;
lock (_lock)
{
_lastFlushTime = DateTime.Now;
_position = 1;
_headerBytes = 1;
_packets = 0;
}
}

internal bool TryWrite(ArraySegment<byte> payload, ushort headerBytes)
{
if (payload.Count + _position + 2 > _buffer.Length)
{
// Wont fit
return false;
}
else
lock (_lock)
{
// TODO: VarInt
// Write the segment size
_buffer[_position] = (byte)(payload.Count);
_buffer[_position + 1] = (byte)(payload.Count >> 8);
if (payload.Count + _position + 2 > _buffer.Length)
{
// Wont fit
return false;
}
else
{
// TODO: VarInt
// Write the segment size
_buffer[_position] = (byte)(payload.Count);
_buffer[_position + 1] = (byte)(payload.Count >> 8);

// Copy the payload with the header
Buffer.BlockCopy(payload.Array, payload.Offset, _buffer, _position + 2, payload.Count);
// Copy the payload with the header
Buffer.BlockCopy(payload.Array, payload.Offset, _buffer, _position + 2, payload.Count);

// Update the position
_position += 2 + payload.Count;
// Update the position
_position += 2 + payload.Count;

// Update the amount of header bytes
_headerBytes += headerBytes;
// Update the amount of header bytes
_headerBytes += headerBytes;

// Update the amount of packets
_packets++;
// Update the amount of packets
_packets++;

return true;
return true;
}
}
}

internal ArraySegment<byte>? TryFlush(out ushort headerBytes)
{
if (_position > 1 && (DateTime.Now - _lastFlushTime).TotalMilliseconds > _flushDelay)
lock (_lock)
{
// Its time to flush
if (_position > 1 && (DateTime.Now - _lastFlushTime).TotalMilliseconds > _flushDelay)
{
// Its time to flush

// Save the size
int flushSize = _position;
// Save the size
int flushSize = _position;

headerBytes = _headerBytes;
headerBytes = _headerBytes;

// Reset values
_position = 1;
_lastFlushTime = DateTime.Now;
_headerBytes = 1;
_packets = 0;
// Reset values
_position = 1;
_lastFlushTime = DateTime.Now;
_headerBytes = 1;
_packets = 0;

return new ArraySegment<byte>(_buffer, 0, flushSize);
}
return new ArraySegment<byte>(_buffer, 0, flushSize);
}

headerBytes = 0;
headerBytes = 0;

return null;
return null;
}
}


Expand All @@ -93,56 +104,59 @@ internal bool TryWrite(ArraySegment<byte> payload, ushort headerBytes)
// DONT MAKE STATIC FOR THREAD SAFETY.
internal List<ArraySegment<byte>> Unpack(ArraySegment<byte> payload)
{
// TODO: VarInt
if (payload.Count < 3)
{
// Payload is too small
return null;
}

// Clear the segments list
_unpackSegments.Clear();

// The offset for walking the buffer
int packetOffset = 0;

while (true)
lock (_lock)
{
if (payload.Count < packetOffset + 2)
{
// No more data to be read
return _unpackSegments;
}

// TODO: VarInt
// Read the size
ushort size = (ushort)(payload.Array[payload.Offset + packetOffset] | (ushort)(payload.Array[payload.Offset + packetOffset + 1] << 8));

if (size < 1)
if (payload.Count < 3)
{
// The size is too small. Doesnt fit the header
return _unpackSegments;
// Payload is too small
return null;
}

// Make sure the size can even fit
if (payload.Count < (packetOffset + 2 + size))
{
// Payload is too small to fit the claimed size. Exit
return _unpackSegments;
}
// Clear the segments list
_unpackSegments.Clear();

// Read the header
HeaderPacker.Unpack(payload.Array[payload.Offset + packetOffset + 2], out byte type, out bool fragment);
// The offset for walking the buffer
int packetOffset = 0;

// Prevent merging a merge
if (type != (byte)MessageType.Merge)
while (true)
{
// Add the new segment
_unpackSegments.Add(new ArraySegment<byte>(payload.Array, payload.Offset + packetOffset + 2, size));
if (payload.Count < packetOffset + 2)
{
// No more data to be read
return _unpackSegments;
}

// TODO: VarInt
// Read the size
ushort size = (ushort)(payload.Array[payload.Offset + packetOffset] | (ushort)(payload.Array[payload.Offset + packetOffset + 1] << 8));

if (size < 1)
{
// The size is too small. Doesnt fit the header
return _unpackSegments;
}

// Make sure the size can even fit
if (payload.Count < (packetOffset + 2 + size))
{
// Payload is too small to fit the claimed size. Exit
return _unpackSegments;
}

// Read the header
HeaderPacker.Unpack(payload.Array[payload.Offset + packetOffset + 2], out byte type, out bool fragment);

// Prevent merging a merge
if (type != (byte)MessageType.Merge)
{
// Add the new segment
_unpackSegments.Add(new ArraySegment<byte>(payload.Array, payload.Offset + packetOffset + 2, size));
}

// Increment the packetOffset
packetOffset += 2 + size;
}

// Increment the packetOffset
packetOffset += 2 + size;
}
}
}
Expand Down

0 comments on commit 435ce17

Please sign in to comment.