Skip to content

Commit

Permalink
fixed problem with overriden UDP listener byte buffer (#3218)
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath authored and Aaronontheweb committed Dec 20, 2017
1 parent 05b9234 commit 75fbb4f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 39 deletions.
55 changes: 51 additions & 4 deletions src/core/Akka.Tests/IO/UdpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;
using FsCheck;

namespace Akka.Tests.IO
{
Expand All @@ -28,17 +29,15 @@ public UdpIntegrationSpec(ITestOutputHelper output)
akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024
akka.io.udp.trace-logging = on
akka.loglevel = DEBUG", output)
akka.io.udp.direct-buffer-size = 1024", output)
{
_addresses = TestUtils.TemporaryServerAddresses(6, udp: true).ToArray();
}

private IActorRef BindUdp(IPEndPoint address, IActorRef handler)
{
var commander = CreateTestProbe();
commander.Send(Udp.Instance.Apply(Sys).Manager, new Udp.Bind(handler, address));
commander.Send(Sys.Udp(), new Udp.Bind(handler, address));
commander.ExpectMsg<Udp.Bound>(x => x.LocalAddress.Is(address));
return commander.Sender;
}
Expand Down Expand Up @@ -144,6 +143,54 @@ void CheckSendingToServer(int iteration)
}
}

[Fact]
public void The_UDP_Fire_and_Forget_implementation_must_be_able_to_send_several_packets_in_a_row()
{
var serverAddress = _addresses[0];
var clientAddress = _addresses[1];
var server = BindUdp(serverAddress, TestActor);
var client = BindUdp(clientAddress, TestActor);

void CheckSendingToClient(ByteString expected)
{
ExpectMsg<Udp.Received>(x =>
{
x.Data.ShouldBe(expected);
x.Sender.Is(serverAddress).ShouldBeTrue($"{x.Sender} was expected to be {serverAddress}");
});
}

void CheckSendingToServer(ByteString expected)
{
ExpectMsg<Udp.Received>(x =>
{
x.Data.ShouldBe(expected);
x.Sender.Is(clientAddress).ShouldBeTrue($"{x.Sender} was expected to be {clientAddress}");
});
}

var data = new[]
{
ByteString.FromString("a"),
ByteString.FromString("bb"),
ByteString.FromString("ccc"),
ByteString.FromString("dddd"),
ByteString.FromString("eeeee"),
ByteString.FromString("ffffff"),
ByteString.FromString("ggggggg"),
ByteString.FromString("hhhhhhhh"),
ByteString.FromString("iiiiiiiii"),
ByteString.FromString("jjjjjjjjjj")
};

var iterations = data.Length;
for (int i = 0; i < iterations; i++) client.Tell(Udp.Send.Create(data[i], serverAddress));
for (int i = 0; i < iterations; i++) CheckSendingToServer(data[i]);

for (int i = 0; i < iterations; i++) server.Tell(Udp.Send.Create(data[i], clientAddress));
for (int i = 0; i < iterations; i++) CheckSendingToClient(data[i]);
}

[Fact]
public void The_UDP_Fire_and_Forget_implementation_must_call_SocketOption_beforeBind_method_before_bind()
{
Expand Down
60 changes: 25 additions & 35 deletions src/core/Akka/IO/UdpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,14 @@ class UdpListener : WithUdpSend, IRequiresMessageQueue<IUnboundedMessageQueueSem
{
private readonly IActorRef _bindCommander;
private readonly Bind _bind;

protected readonly ILoggingAdapter Log = Context.GetLogger();

private IActorRef _selector;

public UdpListener(UdpExt udp, IActorRef bindCommander, Bind bind)
{
Udp = udp;
_bindCommander = bindCommander;
_bind = bind;

_selector = Context.Parent;

Context.Watch(bind.Handler); // sign death pact

Socket = (bind.Options.OfType<Inet.DatagramChannelCreator>().FirstOrDefault() ?? new Inet.DatagramChannelCreator()).Create();
Expand Down Expand Up @@ -89,46 +84,41 @@ protected override bool Receive(object message)

private bool ReadHandlers(object message)
{
if (message is SuspendReading)
{
// TODO: What should we do here - we cant cancel a pending ReceiveAsync
return true;
}
if (message is ResumeReading)
{
ReceiveAsync();
return true;
}
if (message is SocketReceived)
switch (message)
{
var received = (SocketReceived) message;
DoReceive(received.EventArgs, _bind.Handler);
return true;
case SuspendReading _:
// TODO: What should we do here - we cant cancel a pending ReceiveAsync
return true;
case ResumeReading _:
ReceiveAsync();
return true;
case SocketReceived _:
var received = (SocketReceived) message;
DoReceive(received.EventArgs, _bind.Handler);
return true;
case Unbind _:
Log.Debug("Unbinding endpoint [{0}]", _bind.LocalAddress);
try
{
Socket.Dispose();
Sender.Tell(Unbound.Instance);
Log.Debug("Unbound endpoint [{0}], stopping listener", _bind.LocalAddress);
}
finally
{
Context.Stop(Self);
}
return true;
}

if (message is Unbind)
{
Log.Debug("Unbinding endpoint [{0}]", _bind.LocalAddress);
try
{
Socket.Dispose();
Sender.Tell(Unbound.Instance);
Log.Debug("Unbound endpoint [{0}], stopping listener", _bind.LocalAddress);
}
finally
{
Context.Stop(Self);
}
return true;
}
return false;
}

private void DoReceive(SocketAsyncEventArgs e, IActorRef handler)
{
try
{
handler.Tell(new Received(ByteString.FromBytes(e.Buffer, e.Offset, e.BytesTransferred), e.RemoteEndPoint));
handler.Tell(new Received(ByteString.CopyFrom(e.Buffer, e.Offset, e.BytesTransferred), e.RemoteEndPoint));
ReceiveAsync();
}
finally
Expand Down

0 comments on commit 75fbb4f

Please sign in to comment.