Skip to content

Commit

Permalink
feat: Added reliable window exhaust warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
TwoTenPvP committed Sep 10, 2019
1 parent cb3b5a1 commit dcbaea3
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 15 deletions.
52 changes: 39 additions & 13 deletions Ruffles/Channeling/Channels/ReliableSequencedChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,37 @@ public HeapMemory HandlePoll()

return new ArraySegment<byte>(payload.Array, payload.Offset + 2, payload.Count - 2);
}
else if (SequencingUtils.Distance(sequence, _incomingLowestAckedSequence, sizeof(ushort)) > 0 && !_receiveSequencer[sequence].Alive)
else if (SequencingUtils.Distance(sequence, _incomingLowestAckedSequence, sizeof(ushort)) > 0)
{
// Alloc payload plus header memory
HeapMemory memory = memoryManager.AllocHeapMemory((uint)payload.Count - 2);
// Future packet

// Copy the payload
Buffer.BlockCopy(payload.Array, payload.Offset + 2, memory.Buffer, 0, payload.Count - 2);
PendingIncomingPacket unsafeIncoming = _receiveSequencer.GetUnsafe(sequence, out bool isSafe);

// Add to sequencer
_receiveSequencer[sequence] = new PendingIncomingPacket()
if (unsafeIncoming.Alive && !isSafe)
{
Alive = true,
Memory = memory,
Sequence = sequence
};
if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Incoming packet window is exhausted. Disconnecting");

// Send ack
SendAck(sequence);
connection.Disconnect(false);
}
else if (!_receiveSequencer[sequence].Alive)
{
// Alloc payload plus header memory
HeapMemory memory = memoryManager.AllocHeapMemory((uint)payload.Count - 2);

// Copy the payload
Buffer.BlockCopy(payload.Array, payload.Offset + 2, memory.Buffer, 0, payload.Count - 2);

// Add to sequencer
_receiveSequencer[sequence] = new PendingIncomingPacket()
{
Alive = true,
Memory = memory,
Sequence = sequence
};

// Send ack
SendAck(sequence);
}
}

hasMore = false;
Expand All @@ -177,6 +190,19 @@ public HeapMemory[] CreateOutgoingMessage(ArraySegment<byte> payload, out byte h

lock (_lock)
{
PendingOutgoingPacket unsafeOutgoing = _sendSequencer.GetUnsafe(_lastOutboundSequenceNumber + 1, out bool isSafe);

if (unsafeOutgoing.Alive && !isSafe)
{
if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Outgoing packet window is exhausted. Disconnecting");

connection.Disconnect(false);

dealloc = false;
headerSize = 0;
return null;
}

// Increment the sequence number
_lastOutboundSequenceNumber++;

Expand Down
25 changes: 23 additions & 2 deletions Ruffles/Channeling/Channels/ReliableSequencedFragmentedChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,17 @@ public HeapMemory HandlePoll()
{
// This is a packet after the last. One that is not yet completed

// If this is the first fragment we ever get, index the data.
if (!_receiveSequencer[sequence].Alive)
PendingIncomingPacket unsafeIncoming = _receiveSequencer.GetUnsafe(sequence, out bool isSafe);

if (unsafeIncoming.Alive && !isSafe)
{
if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Incoming packet window is exhausted. Disconnecting");

connection.Disconnect(false);
}
else if (!_receiveSequencer[sequence].Alive)
{
// If this is the first fragment we ever get, index the data.
int fragmentPointerSize = isFinal ? fragment + 1 : config.FragmentArrayBaseSize;

object[] fragmentPointers = memoryManager.AllocPointers((uint)fragmentPointerSize);
Expand Down Expand Up @@ -403,6 +411,19 @@ public HeapMemory[] CreateOutgoingMessage(ArraySegment<byte> payload, out byte h

lock (_lock)
{
PendingOutgoingPacket unsafeOutgoing = _sendSequencer.GetUnsafe(_lastOutboundSequenceNumber + 1, out bool isSafe);

if (unsafeOutgoing.Alive && !isSafe)
{
if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Outgoing packet window is exhausted. Disconnecting");

connection.Disconnect(false);

dealloc = false;
headerSize = 0;
return null;
}

// Increment the sequence number
_lastOutboundSequenceNumber++;

Expand Down
9 changes: 9 additions & 0 deletions Ruffles/Messaging/HeapableSlidingWindow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public HeapableSlidingWindow(int size, bool resetOld, byte wrapSize, MemoryManag
_memoryManager = memoryManager;
}

public T GetUnsafe(int index, out bool isSafe)
{
int arrayIndex = NumberUtils.WrapMod(index, _array.Length);

isSafe = _indexes[arrayIndex] == index;

return _array[arrayIndex];
}

public T this[int index]
{
get
Expand Down

0 comments on commit dcbaea3

Please sign in to comment.