Skip to content

Commit

Permalink
fix: Fixed socket scheduling
Browse files Browse the repository at this point in the history
BREAKING CHANGE: All events should now be recycled, connections no longer have to be recycled
  • Loading branch information
TwoTenPvP committed Sep 30, 2019
1 parent 435ce17 commit f4196cd
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 97 deletions.
15 changes: 4 additions & 11 deletions Ruffles.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,10 @@ public static void Main(string[] args)
clientId = serverEvent.Connection.Id;
clientConnection = serverEvent.Connection;
}

if (serverEvent.Type == NetworkEventType.Disconnect || serverEvent.Type == NetworkEventType.Timeout)
{
serverEvent.Connection.Recycle();
}
}

serverEvent.Recycle();

if (clientEvent.Type != NetworkEventType.Nothing)
{
if (clientEvent.Type != NetworkEventType.Data)
Expand All @@ -132,15 +129,11 @@ public static void Main(string[] args)
{
messagesReceived++;
Console.WriteLine("Got message: \"" + Encoding.ASCII.GetString(clientEvent.Data.Array, clientEvent.Data.Offset, clientEvent.Data.Count) + "\"");
clientEvent.Recycle();
}

if (clientEvent.Type == NetworkEventType.Disconnect || clientEvent.Type == NetworkEventType.Timeout)
{
clientEvent.Connection.Recycle();
}
}

clientEvent.Recycle();

if ((DateTime.Now - started).TotalSeconds > 10 && (DateTime.Now - lastSent).TotalSeconds >= (1f / 1))
{
byte[] helloReliable = Encoding.ASCII.GetBytes("This message was sent over a reliable channel" + messageCounter);
Expand Down
7 changes: 1 addition & 6 deletions Ruffles/Configuration/SocketConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,8 @@ public class SocketConfig
// Performance
/// <summary>
/// The max socket block time in milliseconds. This will affect how long the internal loop will block.
/// The amount is double for dual mode sockets.
/// </summary>
public ushort MaxSocketBlockMilliseconds = 5;
/// <summary>
/// The minimum delay for running the internal resend and timeout logic.
/// </summary>
public ulong MinConnectionPollDelay = 50;
public ushort SocketPollTime = 50;
/// <summary>
/// Whether or not to reuse connections. Disabling this has a impact on memory and CPU.
/// If this is enabled, all connections has to be manually recycled by the user after receiving the disconnect or timeout events.
Expand Down
11 changes: 0 additions & 11 deletions Ruffles/Connections/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,6 @@ internal void Reset()
IncomingDuplicateTotalBytes = 0;
IncomingDuplicateUserBytes = 0;

}

/// <summary>
/// Recycle this connection so that it can be reused by Ruffles.
/// </summary>
public void Recycle()
{
if (Dead && !Recycled)
{
Recycled = true;
}
}

#if ALLOW_CONNECTION_STUB
Expand Down
11 changes: 7 additions & 4 deletions Ruffles/Core/NetworkEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ public struct NetworkEvent
/// </summary>
public void Recycle()
{
if (InternalMemory != null)
if (InternalMemory != null && MemoryManager != null)
{
if (!AllowUserRecycle)
if (AllowUserRecycle)
{
throw new MemoryException("Cannot deallocate non recyclable memory");
MemoryManager.DeAlloc(InternalMemory);
}
}

MemoryManager.DeAlloc(InternalMemory);
if (Connection != null && Connection.Dead && !Connection.Recycled)
{
Connection.Recycled = true;
}
}
}
Expand Down
131 changes: 66 additions & 65 deletions Ruffles/Core/RuffleSocket.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Threading;
Expand Down Expand Up @@ -482,56 +483,47 @@ public bool Disconnect(ulong connectionId, bool sendMessage)
return false;
}


private DateTime _lastTimeoutCheckRan = DateTime.MinValue;

private void StartNetworkLogic()
{
while (IsRunning)
{
try
{
InternalPollSocket();
InternalPollSocket(config.SocketPollTime);

if (simulator != null)
{
simulator.RunLoop();
}
}

// Run timeout loop once every ConnectionPollDelay ms
if ((DateTime.Now - _lastTimeoutCheckRan).TotalMilliseconds >= config.MinConnectionPollDelay)
if (config.EnablePacketMerging)
{
if (config.EnablePacketMerging)
{
CheckMergedPackets();
}
CheckMergedPackets();
}

if (config.EnableTimeouts)
{
CheckConnectionTimeouts();
}
if (config.EnableTimeouts)
{
CheckConnectionTimeouts();
}

if (config.EnableHeartbeats)
{
CheckConnectionHeartbeats();
}
if (config.EnableHeartbeats)
{
CheckConnectionHeartbeats();
}

if (config.EnableConnectionRequestResends)
{
CheckConnectionResends();
}
if (config.EnableConnectionRequestResends)
{
CheckConnectionResends();
}

if (config.EnableChannelUpdates)
{
RunChannelInternalUpdate();
}

if (config.EnablePathMTU)
{
RunPathMTU();
}
if (config.EnableChannelUpdates)
{
RunChannelInternalUpdate();
}

_lastTimeoutCheckRan = DateTime.Now;
if (config.EnablePathMTU)
{
RunPathMTU();
}
}
catch (Exception e)
Expand Down Expand Up @@ -818,41 +810,50 @@ private void CheckConnectionHeartbeats()
private EndPoint _fromIPv4Endpoint = new IPEndPoint(IPAddress.Any, 0);
private EndPoint _fromIPv6Endpoint = new IPEndPoint(IPAddress.IPv6Any, 0);

private readonly List<Socket> _selectSockets = new List<Socket>();
private void InternalPollSocket()
{
_selectSockets.Clear();

if (ipv4Socket != null)
{
_selectSockets.Add(ipv4Socket);
}

if (ipv6Socket != null)
{
_selectSockets.Add(ipv6Socket);
}

// Check what sockets have data
Socket.Select(_selectSockets, null, null, config.MaxSocketBlockMilliseconds * 1000);
private readonly List<Socket> _selectSockets = new List<Socket>();
private readonly Stopwatch _selectWatch = new Stopwatch();
private void InternalPollSocket(int ms)
{
_selectWatch.Reset();
_selectWatch.Start();

// Iterate the sockets with data
for (int i = 0; i < _selectSockets.Count; i++)
do
{
try
{
// Get a endpoint reference
EndPoint _endpoint = _selectSockets[i].AddressFamily == AddressFamily.InterNetwork ? _fromIPv4Endpoint : _selectSockets[i].AddressFamily == AddressFamily.InterNetworkV6 ? _fromIPv6Endpoint : null;

int size = _selectSockets[i].ReceiveFrom(incomingBuffer, 0, incomingBuffer.Length, SocketFlags.None, ref _endpoint);
_selectSockets.Clear();

if (ipv4Socket != null)
{
_selectSockets.Add(ipv4Socket);
}

if (ipv6Socket != null)
{
_selectSockets.Add(ipv6Socket);
}

// Check what sockets have data
Socket.Select(_selectSockets, null, null, (ms - (int)_selectWatch.ElapsedMilliseconds) * 1000);

// Iterate the sockets with data
for (int i = 0; i < _selectSockets.Count; i++)
{
try
{
// Get a endpoint reference
EndPoint _endpoint = _selectSockets[i].AddressFamily == AddressFamily.InterNetwork ? _fromIPv4Endpoint : _selectSockets[i].AddressFamily == AddressFamily.InterNetworkV6 ? _fromIPv6Endpoint : null;

int size = _selectSockets[i].ReceiveFrom(incomingBuffer, 0, incomingBuffer.Length, SocketFlags.None, ref _endpoint);

HandlePacket(new ArraySegment<byte>(incomingBuffer, 0, size), _endpoint);
}
catch (Exception e)
{
if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Error when receiving from socket: " + e);
}
}
} while (_selectWatch.ElapsedMilliseconds < ms);

HandlePacket(new ArraySegment<byte>(incomingBuffer, 0, size), _endpoint);
}
catch (Exception e)
{
if (Logging.CurrentLogLevel <= LogLevel.Error) Logging.LogError("Error when receiving from socket: " + e);
}
}
_selectWatch.Stop();
}

/// <summary>
Expand Down

0 comments on commit f4196cd

Please sign in to comment.