From cfdd576a945676a70f6944dbfe2031aac54929d8 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 20 Dec 2017 18:18:56 +0100 Subject: [PATCH] fixed problem with overriden UDP listener byte buffer --- src/core/Akka.Tests/IO/UdpIntegrationSpec.cs | 55 ++++++++++++++++-- src/core/Akka/IO/UdpListener.cs | 60 ++++++++------------ 2 files changed, 76 insertions(+), 39 deletions(-) diff --git a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs index 8c0258b81c6..f6b787eb621 100644 --- a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs @@ -16,6 +16,7 @@ using Xunit; using Xunit.Abstractions; using FluentAssertions; +using FsCheck; namespace Akka.Tests.IO { @@ -28,9 +29,7 @@ public UdpIntegrationSpec(ITestOutputHelper output) akka.io.udp.max-channels = unlimited akka.io.udp.nr-of-selectors = 1 akka.io.udp.direct-buffer-pool-limit = 100 - akka.io.udp.direct-buffer-size = 1024 - akka.io.udp.trace-logging = on - akka.loglevel = DEBUG", output) + akka.io.udp.direct-buffer-size = 1024", output) { _addresses = TestUtils.TemporaryServerAddresses(6, udp: true).ToArray(); } @@ -38,7 +37,7 @@ public UdpIntegrationSpec(ITestOutputHelper output) private IActorRef BindUdp(IPEndPoint address, IActorRef handler) { var commander = CreateTestProbe(); - commander.Send(Udp.Instance.Apply(Sys).Manager, new Udp.Bind(handler, address)); + commander.Send(Sys.Udp(), new Udp.Bind(handler, address)); commander.ExpectMsg(x => x.LocalAddress.Is(address)); return commander.Sender; } @@ -144,6 +143,54 @@ void CheckSendingToServer(int iteration) } } + [Fact] + public void The_UDP_Fire_and_Forget_implementation_must_be_able_to_send_several_packets_in_a_row() + { + var serverAddress = _addresses[0]; + var clientAddress = _addresses[1]; + var server = BindUdp(serverAddress, TestActor); + var client = BindUdp(clientAddress, TestActor); + + void CheckSendingToClient(ByteString expected) + { + ExpectMsg(x => + { + x.Data.ShouldBe(expected); + x.Sender.Is(serverAddress).ShouldBeTrue($"{x.Sender} was expected to be {serverAddress}"); + }); + } + + void CheckSendingToServer(ByteString expected) + { + ExpectMsg(x => + { + x.Data.ShouldBe(expected); + x.Sender.Is(clientAddress).ShouldBeTrue($"{x.Sender} was expected to be {clientAddress}"); + }); + } + + var data = new[] + { + ByteString.FromString("a"), + ByteString.FromString("bb"), + ByteString.FromString("ccc"), + ByteString.FromString("dddd"), + ByteString.FromString("eeeee"), + ByteString.FromString("ffffff"), + ByteString.FromString("ggggggg"), + ByteString.FromString("hhhhhhhh"), + ByteString.FromString("iiiiiiiii"), + ByteString.FromString("jjjjjjjjjj") + }; + + var iterations = data.Length; + for (int i = 0; i < iterations; i++) client.Tell(Udp.Send.Create(data[i], serverAddress)); + for (int i = 0; i < iterations; i++) CheckSendingToServer(data[i]); + + for (int i = 0; i < iterations; i++) server.Tell(Udp.Send.Create(data[i], clientAddress)); + for (int i = 0; i < iterations; i++) CheckSendingToClient(data[i]); + } + [Fact] public void The_UDP_Fire_and_Forget_implementation_must_call_SocketOption_beforeBind_method_before_bind() { diff --git a/src/core/Akka/IO/UdpListener.cs b/src/core/Akka/IO/UdpListener.cs index 9c78e210c8e..807fec1af22 100644 --- a/src/core/Akka/IO/UdpListener.cs +++ b/src/core/Akka/IO/UdpListener.cs @@ -27,19 +27,14 @@ class UdpListener : WithUdpSend, IRequiresMessageQueue().FirstOrDefault() ?? new Inet.DatagramChannelCreator()).Create(); @@ -89,38 +84,33 @@ protected override bool Receive(object message) private bool ReadHandlers(object message) { - if (message is SuspendReading) - { - // TODO: What should we do here - we cant cancel a pending ReceiveAsync - return true; - } - if (message is ResumeReading) - { - ReceiveAsync(); - return true; - } - if (message is SocketReceived) + switch (message) { - var received = (SocketReceived) message; - DoReceive(received.EventArgs, _bind.Handler); - return true; + case SuspendReading _: + // TODO: What should we do here - we cant cancel a pending ReceiveAsync + return true; + case ResumeReading _: + ReceiveAsync(); + return true; + case SocketReceived _: + var received = (SocketReceived) message; + DoReceive(received.EventArgs, _bind.Handler); + return true; + case Unbind _: + Log.Debug("Unbinding endpoint [{0}]", _bind.LocalAddress); + try + { + Socket.Dispose(); + Sender.Tell(Unbound.Instance); + Log.Debug("Unbound endpoint [{0}], stopping listener", _bind.LocalAddress); + } + finally + { + Context.Stop(Self); + } + return true; } - if (message is Unbind) - { - Log.Debug("Unbinding endpoint [{0}]", _bind.LocalAddress); - try - { - Socket.Dispose(); - Sender.Tell(Unbound.Instance); - Log.Debug("Unbound endpoint [{0}], stopping listener", _bind.LocalAddress); - } - finally - { - Context.Stop(Self); - } - return true; - } return false; } @@ -128,7 +118,7 @@ private void DoReceive(SocketAsyncEventArgs e, IActorRef handler) { try { - handler.Tell(new Received(ByteString.FromBytes(e.Buffer, e.Offset, e.BytesTransferred), e.RemoteEndPoint)); + handler.Tell(new Received(ByteString.CopyFrom(e.Buffer, e.Offset, e.BytesTransferred), e.RemoteEndPoint)); ReceiveAsync(); } finally