Skip to content

Commit

Permalink
feat: Added native thread support
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Removes SocketManager and removes the internal polling methods from the public API
  • Loading branch information
TwoTenPvP committed Sep 9, 2019
1 parent 779e43d commit 46f4aaa
Show file tree
Hide file tree
Showing 13 changed files with 969 additions and 910 deletions.
236 changes: 127 additions & 109 deletions Ruffles/Channeling/Channels/ReliableChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public void DeAlloc(MemoryManager memoryManager)
private readonly SocketConfig config;
private readonly MemoryManager memoryManager;

// Lock for the channel, this allows sends and receives being done on different threads.
private readonly object _lock = new object();

internal ReliableChannel(byte channelId, Connection connection, SocketConfig config, MemoryManager memoryManager)
{
this.channelId = channelId;
Expand All @@ -70,49 +73,52 @@ public HeapMemory HandlePoll()
// Set the headerBytes
headerBytes = 2;

if (SequencingUtils.Distance(sequence, _incomingLowestAckedSequence, sizeof(ushort)) <= 0 || _incomingAckedPackets.Contains(sequence))
lock (_lock)
{
// We have already acked this message. Ack again

connection.IncomingDuplicatePackets++;
connection.IncomingDuplicateUserBytes += (ulong)payload.Count - 2;
connection.IncomingDuplicateTotalBytes += (ulong)payload.Count + 2;
if (SequencingUtils.Distance(sequence, _incomingLowestAckedSequence, sizeof(ushort)) <= 0 || _incomingAckedPackets.Contains(sequence))
{
// We have already acked this message. Ack again

SendAck(sequence);
connection.IncomingDuplicatePackets++;
connection.IncomingDuplicateUserBytes += (ulong)payload.Count - 2;
connection.IncomingDuplicateTotalBytes += (ulong)payload.Count + 2;

return null;
}
else if (sequence == _incomingLowestAckedSequence + 1)
{
// This is the "next" packet
SendAck(sequence);

do
return null;
}
else if (sequence == _incomingLowestAckedSequence + 1)
{
// Remove previous
_incomingAckedPackets.Remove(_incomingLowestAckedSequence);
// This is the "next" packet

_incomingLowestAckedSequence++;
}
while (_incomingAckedPackets.Contains((ushort)(_incomingLowestAckedSequence + 1)));
do
{
// Remove previous
_incomingAckedPackets.Remove(_incomingLowestAckedSequence);

// Ack the new message
SendAck(sequence);
_incomingLowestAckedSequence++;
}
while (_incomingAckedPackets.Contains((ushort)(_incomingLowestAckedSequence + 1)));

return new ArraySegment<byte>(payload.Array, payload.Offset + 2, payload.Count - 2);
}
else if (SequencingUtils.Distance(sequence, _incomingLowestAckedSequence, sizeof(ushort)) > 0 && !_incomingAckedPackets.Contains(sequence))
{
// This is a future packet
// Ack the new message
SendAck(sequence);

// Add to sequencer
_incomingAckedPackets.Add(sequence);
return new ArraySegment<byte>(payload.Array, payload.Offset + 2, payload.Count - 2);
}
else if (SequencingUtils.Distance(sequence, _incomingLowestAckedSequence, sizeof(ushort)) > 0 && !_incomingAckedPackets.Contains(sequence))
{
// This is a future packet

SendAck(sequence);
// Add to sequencer
_incomingAckedPackets.Add(sequence);

return new ArraySegment<byte>(payload.Array, payload.Offset + 2, payload.Count - 2);
}
SendAck(sequence);

return null;
return new ArraySegment<byte>(payload.Array, payload.Offset + 2, payload.Count - 2);
}

return null;
}
}

private readonly HeapMemory[] SINGLE_MESSAGE_ARRAY = new HeapMemory[1];
Expand All @@ -127,44 +133,47 @@ public HeapMemory[] CreateOutgoingMessage(ArraySegment<byte> payload, out byte h
return null;
}

// Increment the sequence number
_lastOutboundSequenceNumber++;
lock (_lock)
{
// Increment the sequence number
_lastOutboundSequenceNumber++;

// Set header size
headerSize = 4;
// Set header size
headerSize = 4;

// Allocate the memory
HeapMemory memory = memoryManager.AllocHeapMemory((uint)payload.Count + 4);

// Write headers
memory.Buffer[0] = HeaderPacker.Pack((byte)MessageType.Data, false);
memory.Buffer[1] = channelId;
// Allocate the memory
HeapMemory memory = memoryManager.AllocHeapMemory((uint)payload.Count + 4);

// Write the sequence
memory.Buffer[2] = (byte)_lastOutboundSequenceNumber;
memory.Buffer[3] = (byte)(_lastOutboundSequenceNumber >> 8);
// Write headers
memory.Buffer[0] = HeaderPacker.Pack((byte)MessageType.Data, false);
memory.Buffer[1] = channelId;

// Copy the payload
Buffer.BlockCopy(payload.Array, payload.Offset, memory.Buffer, 4, payload.Count);
// Write the sequence
memory.Buffer[2] = (byte)_lastOutboundSequenceNumber;
memory.Buffer[3] = (byte)(_lastOutboundSequenceNumber >> 8);

// Add the memory to pending
_sendSequencer[_lastOutboundSequenceNumber] = new PendingOutgoingPacket()
{
Alive = true,
Sequence = _lastOutboundSequenceNumber,
Attempts = 1,
LastSent = DateTime.Now,
FirstSent = DateTime.Now,
Memory = memory
};
// Copy the payload
Buffer.BlockCopy(payload.Array, payload.Offset, memory.Buffer, 4, payload.Count);

// Tell the caller NOT to dealloc the memory, the channel needs it for resend purposes.
dealloc = false;
// Add the memory to pending
_sendSequencer[_lastOutboundSequenceNumber] = new PendingOutgoingPacket()
{
Alive = true,
Sequence = _lastOutboundSequenceNumber,
Attempts = 1,
LastSent = DateTime.Now,
FirstSent = DateTime.Now,
Memory = memory
};

// Tell the caller NOT to dealloc the memory, the channel needs it for resend purposes.
dealloc = false;

// Assign memory
SINGLE_MESSAGE_ARRAY[0] = memory;
// Assign memory
SINGLE_MESSAGE_ARRAY[0] = memory;

return SINGLE_MESSAGE_ARRAY;
return SINGLE_MESSAGE_ARRAY;
}
}

public void HandleAck(ArraySegment<byte> payload)
Expand Down Expand Up @@ -199,78 +208,87 @@ public void HandleAck(ArraySegment<byte> payload)

private void HandleAck(ushort sequence)
{
if (_sendSequencer[sequence].Alive)
lock (_lock)
{
// Add statistics
connection.OutgoingConfirmedPackets++;
if (_sendSequencer[sequence].Alive)
{
// Add statistics
connection.OutgoingConfirmedPackets++;

// Dealloc the memory held by the sequencer
memoryManager.DeAlloc(_sendSequencer[sequence].Memory);
// Dealloc the memory held by the sequencer
memoryManager.DeAlloc(_sendSequencer[sequence].Memory);

// TODO: Remove roundtripping from channeled packets and make specific ping-pong packets
// TODO: Remove roundtripping from channeled packets and make specific ping-pong packets

// Get the roundtrp
ulong roundtrip = (ulong)Math.Round((DateTime.Now - _sendSequencer[sequence].FirstSent).TotalMilliseconds);
// Get the roundtrp
ulong roundtrip = (ulong)Math.Round((DateTime.Now - _sendSequencer[sequence].FirstSent).TotalMilliseconds);

// Report to the connection
connection.AddRoundtripSample(roundtrip);
// Report to the connection
connection.AddRoundtripSample(roundtrip);

// Kill the packet
_sendSequencer[sequence] = new PendingOutgoingPacket()
{
Alive = false,
Sequence = sequence
};
}
// Kill the packet
_sendSequencer[sequence] = new PendingOutgoingPacket()
{
Alive = false,
Sequence = sequence
};
}

for (ushort i = sequence; _sendSequencer[i].Alive; i++)
{
_incomingLowestAckedSequence = i;
for (ushort i = sequence; _sendSequencer[i].Alive; i++)
{
_incomingLowestAckedSequence = i;
}
}
}

public void InternalUpdate()
{
long distance = SequencingUtils.Distance(_lastOutboundSequenceNumber, _incomingLowestAckedSequence, sizeof(ushort));

for (ushort i = _incomingLowestAckedSequence; i < _incomingLowestAckedSequence + distance; i++)
lock (_lock)
{
if (_sendSequencer[i].Alive)
long distance = SequencingUtils.Distance(_lastOutboundSequenceNumber, _incomingLowestAckedSequence, sizeof(ushort));

for (ushort i = _incomingLowestAckedSequence; i < _incomingLowestAckedSequence + distance; i++)
{
if (_sendSequencer[i].Attempts > config.ReliabilityMaxResendAttempts)
{
// If they don't ack the message, disconnect them
connection.Disconnect(false);
}
else if ((DateTime.Now - _sendSequencer[i].LastSent).TotalMilliseconds > connection.Roundtrip * config.ReliabilityResendRoundtripMultiplier)
if (_sendSequencer[i].Alive)
{
_sendSequencer[i] = new PendingOutgoingPacket()
if (_sendSequencer[i].Attempts > config.ReliabilityMaxResendAttempts)
{
Alive = true,
Attempts = (ushort)(_sendSequencer[i].Attempts + 1),
LastSent = DateTime.Now,
FirstSent = _sendSequencer[i].FirstSent,
Memory = _sendSequencer[i].Memory,
Sequence = i
};

connection.SendRaw(new ArraySegment<byte>(_sendSequencer[i].Memory.Buffer, (int)_sendSequencer[i].Memory.VirtualOffset, (int)_sendSequencer[i].Memory.VirtualCount), false, 4);

connection.OutgoingResentPackets++;
// If they don't ack the message, disconnect them
connection.Disconnect(false);
}
else if ((DateTime.Now - _sendSequencer[i].LastSent).TotalMilliseconds > connection.Roundtrip * config.ReliabilityResendRoundtripMultiplier)
{
_sendSequencer[i] = new PendingOutgoingPacket()
{
Alive = true,
Attempts = (ushort)(_sendSequencer[i].Attempts + 1),
LastSent = DateTime.Now,
FirstSent = _sendSequencer[i].FirstSent,
Memory = _sendSequencer[i].Memory,
Sequence = i
};

connection.SendRaw(new ArraySegment<byte>(_sendSequencer[i].Memory.Buffer, (int)_sendSequencer[i].Memory.VirtualOffset, (int)_sendSequencer[i].Memory.VirtualCount), false, 4);

connection.OutgoingResentPackets++;
}
}
}
}
}

public void Reset()
{
// Clear all incoming states
_incomingAckedPackets.Clear();
_incomingLowestAckedSequence = 0;
lock (_lock)
{
// Clear all incoming states
_incomingAckedPackets.Clear();
_incomingLowestAckedSequence = 0;

// Clear all outgoing states
_sendSequencer.Release();
_lastOutboundSequenceNumber = 0;
// Clear all outgoing states
_sendSequencer.Release();
_lastOutboundSequenceNumber = 0;
}
}

private void SendAck(ushort sequence)
Expand Down
Loading

0 comments on commit 46f4aaa

Please sign in to comment.