Skip to content

Commit

Permalink
Optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
HakanL committed Dec 6, 2024
1 parent 062692e commit 1b8a3bd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/Haukcode.sACN/Haukcode.sACN.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
</When>
<Otherwise>
<ItemGroup>
<PackageReference Include="Haukcode.HighPerfComm" Version="1.0.8" />
<PackageReference Include="Haukcode.HighPerfComm" Version="1.0.10" />
</ItemGroup>
</Otherwise>
</Choose>
Expand Down
2 changes: 1 addition & 1 deletion src/Haukcode.sACN/Internal/ReceiveDataBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ public abstract class ReceiveDataBase

public IPEndPoint Source { get; set; } = null!;

public IPEndPoint Destination { get; set; } = null!;
public IPEndPoint? Destination { get; set; }
}
}
102 changes: 54 additions & 48 deletions src/Haukcode.sACN/SACNClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,24 @@ public class SendData : HighPerfComm.SendData
public IPEndPoint? Destination { get; set; }
}

private const int ReceiveBufferSize = 20480;
private const int SendBufferSize = 680 * 100;
private const int ReceiveBufferSize = 680 * 40 * 400;
private const int SendBufferSize = 680 * 20 * 400;
private static readonly IPEndPoint _blankEndpoint = new(IPAddress.Any, 0);

private readonly Socket listenSocket;
private readonly Socket listenMulticastSocket;
private readonly Socket listenUnicastSocket;
private readonly Socket sendSocket;
private readonly ISubject<ReceiveDataPacket> packetSubject;
private readonly Dictionary<ushort, byte> sequenceIds = [];
private readonly Dictionary<ushort, byte> sequenceIdsSync = [];
private readonly object lockObject = new();
private readonly HashSet<ushort> dmxUniverses = [];
private readonly Dictionary<IPAddress, IPEndPoint> endPointCache = [];
private readonly Dictionary<IPAddress, (IPEndPoint EndPoint, bool Multicast)> endPointCache = [];
private readonly IPEndPoint localEndPoint;
private readonly Dictionary<ushort, IPEndPoint> universeMulticastEndpoints = [];

public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int port = 5568)
: base(() => new SendData(), ReceiveBufferSize)
: base(() => new SendData(), 1024)
{
if (senderId == Guid.Empty)
throw new ArgumentException("Invalid sender Id", nameof(senderId));
Expand All @@ -54,48 +55,27 @@ public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int

this.packetSubject = new Subject<ReceiveDataPacket>();

this.listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.listenSocket.ReceiveBufferSize = ReceiveBufferSize;
this.listenMulticastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.listenMulticastSocket.ReceiveBufferSize = ReceiveBufferSize;
SetSocketOptions(this.listenMulticastSocket);

// Set the SIO_UDP_CONNRESET ioctl to true for this UDP socket. If this UDP socket
// ever sends a UDP packet to a remote destination that exists but there is
// no socket to receive the packet, an ICMP port unreachable message is returned
// to the sender. By default, when this is received the next operation on the
// UDP socket that send the packet will receive a SocketException. The native
// (Winsock) error that is received is WSAECONNRESET (10054). Since we don't want
// to wrap each UDP socket operation in a try/except, we'll disable this error
// for the socket with this ioctl call.
try
{
uint IOC_IN = 0x80000000;
uint IOC_VENDOR = 0x18000000;
uint SIO_UDP_CONNRESET = IOC_IN | IOC_VENDOR | 12;

byte[] optionInValue = { Convert.ToByte(false) };
byte[] optionOutValue = new byte[4];
this.listenSocket.IOControl((int)SIO_UDP_CONNRESET, optionInValue, optionOutValue);
}
catch
{
Debug.WriteLine("Unable to set SIO_UDP_CONNRESET, maybe not supported.");
}

this.listenSocket.ExclusiveAddressUse = false;
this.listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
this.listenUnicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.listenUnicastSocket.ReceiveBufferSize = ReceiveBufferSize;
SetSocketOptions(this.listenUnicastSocket);

this.listenSocket.Bind(new IPEndPoint(IPAddress.Any, port));
// Linux wants IPAddress.Any to get multicast/broadcast packets
this.listenMulticastSocket.Bind(new IPEndPoint(IPAddress.Any, port));
this.listenUnicastSocket.Bind(this.localEndPoint);

// Only join local LAN group
this.listenSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 1);
this.listenMulticastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 1);

this.sendSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
ConfigureSendSocket(this.sendSocket);
}

private void ConfigureSendSocket(Socket socket)
private void SetSocketOptions(Socket socket)
{
socket.SendBufferSize = SendBufferSize;

// Set the SIO_UDP_CONNRESET ioctl to true for this UDP socket. If this UDP socket
// ever sends a UDP packet to a remote destination that exists but there is
// no socket to receive the packet, an ICMP port unreachable message is returned
Expand All @@ -121,6 +101,13 @@ private void ConfigureSendSocket(Socket socket)

socket.ExclusiveAddressUse = false;
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
}

private void ConfigureSendSocket(Socket socket)
{
socket.SendBufferSize = SendBufferSize;

SetSocketOptions(socket);

socket.Bind(this.localEndPoint);

Expand Down Expand Up @@ -149,14 +136,16 @@ private void ConfigureSendSocket(Socket socket)
/// </summary>
public IReadOnlyCollection<ushort> DMXUniverses => this.dmxUniverses.ToList();

protected override bool SupportsTwoReceivers => true;

public void JoinDMXUniverse(ushort universeId)
{
if (this.dmxUniverses.Contains(universeId))
throw new InvalidOperationException($"You have already joined the DMX Universe {universeId}");

// Join group
var option = new MulticastOption(SACNCommon.GetMulticastAddress(universeId), this.localEndPoint.Address);
this.listenSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, option);
this.listenMulticastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, option);

// Add to the list of universes we have joined
this.dmxUniverses.Add(universeId);
Expand All @@ -169,7 +158,7 @@ public void DropDMXUniverse(ushort universeId)

// Drop group
var option = new MulticastOption(SACNCommon.GetMulticastAddress(universeId), this.localEndPoint.Address);
this.listenSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.DropMembership, option);
this.listenMulticastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.DropMembership, option);

// Remove from the list of universes we have joined
this.dmxUniverses.Remove(universeId);
Expand Down Expand Up @@ -235,11 +224,11 @@ await base.QueuePacket(packet.Length, important, (newSendData, memory) =>
{
if (!this.endPointCache.TryGetValue(destination, out var ipEndPoint))
{
ipEndPoint = new IPEndPoint(destination, this.localEndPoint.Port);
ipEndPoint = (new IPEndPoint(destination, this.localEndPoint.Port), destination.GetAddressBytes()[0] == 239);
this.endPointCache.Add(destination, ipEndPoint);
}

newSendData.Destination = ipEndPoint;
newSendData.Destination = ipEndPoint.Multicast ? null : ipEndPoint.EndPoint;
}

newSendData.UniverseId = universeId;
Expand Down Expand Up @@ -288,13 +277,23 @@ protected override void Dispose(bool disposing)
{
try
{
this.listenSocket.Shutdown(SocketShutdown.Both);
this.listenMulticastSocket.Shutdown(SocketShutdown.Both);
}
catch
{
}
this.listenMulticastSocket.Close();
this.listenMulticastSocket.Dispose();

try
{
this.listenUnicastSocket.Shutdown(SocketShutdown.Both);
}
catch
{
}
this.listenSocket.Close();
this.listenSocket.Dispose();
this.listenUnicastSocket.Close();
this.listenUnicastSocket.Dispose();

try
{
Expand Down Expand Up @@ -324,9 +323,16 @@ protected override ValueTask<int> SendPacketAsync(SendData sendData, ReadOnlyMem
return this.sendSocket.SendToAsync(payload, SocketFlags.None, destination);
}

protected async override ValueTask<(int ReceivedBytes, SocketReceiveMessageFromResult Result)> ReceiveData(Memory<byte> memory, CancellationToken cancelToken)
protected async override ValueTask<(int ReceivedBytes, SocketReceiveMessageFromResult Result)> ReceiveData1(Memory<byte> memory, CancellationToken cancelToken)
{
var result = await this.listenMulticastSocket.ReceiveMessageFromAsync(memory, SocketFlags.None, _blankEndpoint, cancelToken);

return (result.ReceivedBytes, result);
}

protected async override ValueTask<(int ReceivedBytes, SocketReceiveMessageFromResult Result)> ReceiveData2(Memory<byte> memory, CancellationToken cancelToken)
{
var result = await this.listenSocket.ReceiveMessageFromAsync(memory, SocketFlags.None, _blankEndpoint, cancelToken);
var result = await this.listenUnicastSocket.ReceiveMessageFromAsync(memory, SocketFlags.None, _blankEndpoint, cancelToken);

return (result.ReceivedBytes, result);
}
Expand All @@ -346,11 +352,11 @@ protected override void ParseReceiveData(ReadOnlyMemory<byte> memory, SocketRece

if (!this.endPointCache.TryGetValue(result.PacketInformation.Address, out var ipEndPoint))
{
ipEndPoint = new IPEndPoint(result.PacketInformation.Address, this.localEndPoint.Port);
ipEndPoint = (new IPEndPoint(result.PacketInformation.Address, this.localEndPoint.Port), result.PacketInformation.Address.GetAddressBytes()[0] == 239);
this.endPointCache.Add(result.PacketInformation.Address, ipEndPoint);
}

newPacket.Destination = ipEndPoint;
newPacket.Destination = ipEndPoint.Multicast ? null : ipEndPoint.EndPoint;

this.packetSubject.OnNext(newPacket);
}
Expand Down

0 comments on commit 1b8a3bd

Please sign in to comment.