From 83d856182efe04eeb244ed084731407c8ccf58db Mon Sep 17 00:00:00 2001 From: Anton Firszov Date: Mon, 16 Nov 2020 13:50:48 +0100 Subject: [PATCH] Organize SendReceive tests and isolate non-parallel test collection (#44591) Some `SendReceive` socket tests may be prone to timing issues on CI. This seems to be the root cause of #1712. We need a reliable way to run such tests to unblock the work on new UDP socket API-s in #33418. This PR defines a new `SendReceiveNonParallel` test group, moving `SendToRecvFrom_Datagram_UDP` into that group. Since this is already a significant reorganization, it seemed reasonable to also: - Harmonize naming: all SendReceive test classses are now named either `SendReceive_[SubVariant]` or `SendReceiveNonParallel_[SubVariant]` - Split `SendReceive.cs` into multiple files: - `SendReceive.cs` for the parallel variants - `SendReceiveNonParallel.cs` for the new, non-parallel variants - Rename the non-generic class `SendReceive` to `SendReceiveMisc` (to avoid name collision and confusion with `SendReceive`) and move it to `SendReceiveMisc.cs` - Move `SendReceiveListener` and `SendReceiveUdpClient` to separate files, rename `SendReceiveListener` to `SendReceiveTcpClient` --- .../FunctionalTests/InlineCompletions.Unix.cs | 10 +- .../tests/FunctionalTests/LoggingTest.cs | 16 +- .../{ => SendReceive}/SendReceive.cs | 552 +----------------- .../SendReceive/SendReceiveMisc.cs | 286 +++++++++ .../SendReceive/SendReceiveNonParallel.cs | 147 +++++ .../SendReceive/SendReceiveTcpClient.cs | 82 +++ .../SendReceive/SendReceiveUdpClient.cs | 89 +++ .../System.Net.Sockets.Tests.csproj | 6 +- .../tests/FunctionalTests/TelemetryTest.cs | 16 +- 9 files changed, 648 insertions(+), 556 deletions(-) rename src/libraries/System.Net.Sockets/tests/FunctionalTests/{ => SendReceive}/SendReceive.cs (71%) create mode 100644 src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveMisc.cs create mode 100644 src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveNonParallel.cs create mode 100644 src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveTcpClient.cs create mode 100644 src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveUdpClient.cs diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs index a8c9d93d960fc..63103ab7faea2 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/InlineCompletions.Unix.cs @@ -28,11 +28,11 @@ public void InlineSocketContinuations() await new AcceptEap(null).Accept_ConcurrentAcceptsAfterConnects_Success(5); // Send/Receive tests - await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, useMultipleBuffers: false); - await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentReceives(IPAddress.Loopback, useMultipleBuffers: false); - await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentSends(IPAddress.Loopback, useMultipleBuffers: false); - await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true, ipv6Server: false, dualModeClient: false); - await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false, ipv6Server: false, dualModeClient: false); + await new SendReceive_Eap(null).SendRecv_Stream_TCP(IPAddress.Loopback, useMultipleBuffers: false); + await new SendReceive_Eap(null).SendRecv_Stream_TCP_MultipleConcurrentReceives(IPAddress.Loopback, useMultipleBuffers: false); + await new SendReceive_Eap(null).SendRecv_Stream_TCP_MultipleConcurrentSends(IPAddress.Loopback, useMultipleBuffers: false); + await new SendReceive_Eap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true, ipv6Server: false, dualModeClient: false); + await new SendReceive_Eap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false, ipv6Server: false, dualModeClient: false); }, options).Dispose(); } } diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/LoggingTest.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/LoggingTest.cs index 77f12d7679c2b..a683bc3544c0a 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/LoggingTest.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/LoggingTest.cs @@ -44,17 +44,17 @@ public void EventSource_EventsRaisedAsExpected() { // Invoke several tests to execute code paths while tracing is enabled - new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); - new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); + new SendReceive_Sync(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); + new SendReceive_Sync(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); - new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); - new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); + new SendReceive_Task(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); + new SendReceive_Task(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); - new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); - new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); + new SendReceive_Eap(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); + new SendReceive_Eap(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); - new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); - new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); + new SendReceive_Apm(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter(); + new SendReceive_Apm(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter(); new NetworkStreamTest().CopyToAsync_AllDataCopied(4096, true).GetAwaiter().GetResult(); new NetworkStreamTest().Timeout_Roundtrips().GetAwaiter().GetResult(); diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceive.cs similarity index 71% rename from src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs rename to src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceive.cs index ad2c3f7376f5e..dfef96ef16a97 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceive.cs @@ -7,7 +7,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.RemoteExecutor; -using Microsoft.DotNet.XUnitExtensions; using Xunit; using Xunit.Abstractions; using Xunit.Sdk; @@ -45,91 +44,6 @@ public async Task InvalidArguments_Throws(int? length, int offset, int count) } } - public static IEnumerable LoopbackWithBool => - from addr in Loopbacks - from b in new[] { false, true } - select new object[] { addr[0], b }; - - [ActiveIssue("https://github.com/dotnet/runtime/issues/1712")] - [OuterLoop] - [Theory] - [MemberData(nameof(LoopbackWithBool))] - public async Task SendToRecvFrom_Datagram_UDP(IPAddress loopbackAddress, bool useClone) - { - IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress; - - const int DatagramSize = 256; - const int DatagramsToSend = 256; - const int AckTimeout = 10000; - const int TestTimeout = 30000; - - using var origLeft = new Socket(leftAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - using var origRight = new Socket(rightAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - origLeft.BindToAnonymousPort(leftAddress); - origRight.BindToAnonymousPort(rightAddress); - - using var left = useClone ? new Socket(origLeft.SafeHandle) : origLeft; - using var right = useClone ? new Socket(origRight.SafeHandle) : origRight; - - var leftEndpoint = (IPEndPoint)left.LocalEndPoint; - var rightEndpoint = (IPEndPoint)right.LocalEndPoint; - - var receiverAck = new SemaphoreSlim(0); - var senderAck = new SemaphoreSlim(0); - - _output.WriteLine($"{DateTime.Now}: Sending data from {rightEndpoint} to {leftEndpoint}"); - - var receivedChecksums = new uint?[DatagramsToSend]; - Task leftThread = Task.Run(async () => - { - EndPoint remote = leftEndpoint.Create(leftEndpoint.Serialize()); - var recvBuffer = new byte[DatagramSize]; - for (int i = 0; i < DatagramsToSend; i++) - { - SocketReceiveFromResult result = await ReceiveFromAsync( - left, new ArraySegment(recvBuffer), remote); - Assert.Equal(DatagramSize, result.ReceivedBytes); - Assert.Equal(rightEndpoint, result.RemoteEndPoint); - - int datagramId = recvBuffer[0]; - Assert.Null(receivedChecksums[datagramId]); - receivedChecksums[datagramId] = Fletcher32.Checksum(recvBuffer, 0, result.ReceivedBytes); - - receiverAck.Release(); - bool gotAck = await senderAck.WaitAsync(TestTimeout); - Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {TestTimeout} for senderAck in iteration {i}"); - } - }); - - var sentChecksums = new uint[DatagramsToSend]; - using (right) - { - var random = new Random(); - var sendBuffer = new byte[DatagramSize]; - for (int i = 0; i < DatagramsToSend; i++) - { - random.NextBytes(sendBuffer); - sendBuffer[0] = (byte)i; - - int sent = await SendToAsync(right, new ArraySegment(sendBuffer), leftEndpoint); - - bool gotAck = await receiverAck.WaitAsync(AckTimeout); - Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {AckTimeout} for receiverAck in iteration {i} after sending {sent}. Receiver is in {leftThread.Status}"); - senderAck.Release(); - - Assert.Equal(DatagramSize, sent); - sentChecksums[i] = Fletcher32.Checksum(sendBuffer, 0, sent); - } - } - - await leftThread; - for (int i = 0; i < DatagramsToSend; i++) - { - Assert.NotNull(receivedChecksums[i]); - Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]); - } - } - [OuterLoop] [Theory] [MemberData(nameof(LoopbacksAndBuffers))] @@ -1156,439 +1070,9 @@ await RetryHelper.ExecuteAsync(async () => } } - public class SendReceive - { - [Fact] - public void SendRecvIovMaxTcp_Success() - { - // sending/receiving more than IOV_MAX segments causes EMSGSIZE on some platforms. - // This is handled internally for stream sockets so this error shouldn't surface. - - // Use more than IOV_MAX (1024 on Linux & macOS) segments. - const int SegmentCount = 2400; - using (var server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) - { - server.BindToAnonymousPort(IPAddress.Loopback); - server.Listen(1); - - var sendBuffer = new byte[SegmentCount]; - Task serverProcessingTask = Task.Run(() => - { - using (Socket acceptSocket = server.Accept()) - { - // send data as SegmentCount (> IOV_MAX) 1-byte segments. - var sendSegments = new List>(); - for (int i = 0; i < SegmentCount; i++) - { - sendBuffer[i] = (byte)i; - sendSegments.Add(new ArraySegment(sendBuffer, i, 1)); - } - SocketError error; - // Send blocks until all segments are sent. - int bytesSent = acceptSocket.Send(sendSegments, SocketFlags.None, out error); - - Assert.Equal(SegmentCount, bytesSent); - Assert.Equal(SocketError.Success, error); - - acceptSocket.Shutdown(SocketShutdown.Send); - } - }); - - using (var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) - { - client.Connect(server.LocalEndPoint); - - // receive data as 1-byte segments. - var receiveBuffer = new byte[SegmentCount]; - var receiveSegments = new List>(); - for (int i = 0; i < SegmentCount; i++) - { - receiveSegments.Add(new ArraySegment(receiveBuffer, i, 1)); - } - var bytesReceivedTotal = 0; - do - { - SocketError error; - // Receive can return up to IOV_MAX segments. - int bytesReceived = client.Receive(receiveSegments, SocketFlags.None, out error); - bytesReceivedTotal += bytesReceived; - // Offset receiveSegments for next Receive. - receiveSegments.RemoveRange(0, bytesReceived); - - Assert.NotEqual(0, bytesReceived); - Assert.Equal(SocketError.Success, error); - } while (bytesReceivedTotal != SegmentCount); - - AssertExtensions.Equal(sendBuffer, receiveBuffer); - } - } - } - - [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] - public void SendIovMaxUdp_SuccessOrMessageSize() - { - // sending more than IOV_MAX segments causes EMSGSIZE on some platforms. - // We handle this for stream sockets by truncating. - // This test verifies we are not truncating non-stream sockets. - - // Use more than IOV_MAX (1024 on Linux & macOS) segments - // and less than Ethernet MTU. - const int SegmentCount = 1200; - using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)) - { - socket.BindToAnonymousPort(IPAddress.Loopback); - // Use our own address as destination. - socket.Connect(socket.LocalEndPoint); - - var sendBuffer = new byte[SegmentCount]; - var sendSegments = new List>(); - for (int i = 0; i < SegmentCount; i++) - { - sendBuffer[i] = (byte)i; - sendSegments.Add(new ArraySegment(sendBuffer, i, 1)); - } - - SocketError error; - // send data as SegmentCount (> IOV_MAX) 1-byte segments. - int bytesSent = socket.Send(sendSegments, SocketFlags.None, out error); - if (error == SocketError.Success) - { - // platform sent message with > IOV_MAX segments - Assert.Equal(SegmentCount, bytesSent); - } - else - { - // platform returns EMSGSIZE - Assert.Equal(SocketError.MessageSize, error); - Assert.Equal(0, bytesSent); - } - } - } - - [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] - public async Task ReceiveIovMaxUdp_SuccessOrMessageSize() - { - // receiving more than IOV_MAX segments causes EMSGSIZE on some platforms. - // We handle this for stream sockets by truncating. - // This test verifies we are not truncating non-stream sockets. - - // Use more than IOV_MAX (1024 on Linux & macOS) segments - // and less than Ethernet MTU. - const int SegmentCount = 1200; - var sender = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - sender.BindToAnonymousPort(IPAddress.Loopback); - var receiver = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - receiver.Connect(sender.LocalEndPoint); // only receive from sender - EndPoint receiverEndPoint = receiver.LocalEndPoint; - - Barrier b = new Barrier(2); - - Task receiveTask = Task.Run(() => - { - using (receiver) - { - var receiveBuffer = new byte[SegmentCount]; - var receiveSegments = new List>(); - for (int i = 0; i < SegmentCount; i++) - { - receiveSegments.Add(new ArraySegment(receiveBuffer, i, 1)); - } - // receive data as SegmentCount (> IOV_MAX) 1-byte segments. - SocketError error; - // Signal we are ready to receive. - b.SignalAndWait(); - int bytesReceived = receiver.Receive(receiveSegments, SocketFlags.None, out error); - - if (error == SocketError.Success) - { - // platform received message in > IOV_MAX segments - Assert.Equal(SegmentCount, bytesReceived); - } - else - { - // platform returns EMSGSIZE - Assert.Equal(SocketError.MessageSize, error); - Assert.Equal(0, bytesReceived); - } - } - }); - - using (sender) - { - sender.Connect(receiverEndPoint); - - // Synchronize and wait for receiving task to be ready. - b.SignalAndWait(); - - var sendBuffer = new byte[SegmentCount]; - for (int i = 0; i < 10; i++) // UDPRedundancy - { - int bytesSent = sender.Send(sendBuffer); - Assert.Equal(SegmentCount, bytesSent); - await Task.WhenAny(receiveTask, Task.Delay(3)); - if (receiveTask.IsCompleted) - { - break; - } - } - } - - Assert.True(receiveTask.IsCompleted); - await receiveTask; - } - - [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] - [PlatformSpecific(~TestPlatforms.Windows)] // All data is sent, even when very large (100M). - public void SocketSendWouldBlock_ReturnsBytesSent() - { - using (var server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) - using (var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) - { - // listen - server.BindToAnonymousPort(IPAddress.Loopback); - server.Listen(1); - // connect - client.Connect(server.LocalEndPoint); - // accept - using (Socket socket = server.Accept()) - { - // We send a large amount of data but don't read it. - // A chunck will be sent, attempts to send more will return SocketError.WouldBlock. - // Socket.Send must return the success of the partial send. - socket.Blocking = false; - var data = new byte[5_000_000]; - SocketError error; - int bytesSent = socket.Send(data, 0, data.Length, SocketFlags.None, out error); - - Assert.Equal(SocketError.Success, error); - Assert.InRange(bytesSent, 1, data.Length - 1); - } - } - } - - [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] - [PlatformSpecific(TestPlatforms.AnyUnix)] - public async Task Socket_ReceiveFlags_Success() - { - using (var sender = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)) - using (var receiver = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)) - { - receiver.BindToAnonymousPort(IPAddress.Loopback); - sender.Connect(receiver.LocalEndPoint); - sender.SendBufferSize = 1500; - - var data = new byte[500]; - data[0] = data[499] = 1; - - Assert.Equal(500, sender.Send(data)); - data[0] = data[499] = 2; - Assert.Equal(500, sender.Send(data)); - - var tcs = new TaskCompletionSource(); - SocketAsyncEventArgs args = new SocketAsyncEventArgs(); - - var receiveBufer = new byte[600]; - receiveBufer[0] = data[499] = 0; - - args.SetBuffer(receiveBufer, 0, receiveBufer.Length); - args.Completed += delegate { tcs.SetResult(); }; - - // First peek at the message. - args.SocketFlags = SocketFlags.Peek; - if (receiver.ReceiveAsync(args)) - { - await tcs.Task.TimeoutAfter(TestSettings.PassingTestTimeout); - } - Assert.Equal(SocketFlags.None, args.SocketFlags); - Assert.Equal(1, receiveBufer[0]); - Assert.Equal(1, receiveBufer[499]); - receiveBufer[0] = receiveBufer[499] = 0; - - // Now, we should be able to get same message again. - tcs = new TaskCompletionSource(); - args.SocketFlags = SocketFlags.None; - if (receiver.ReceiveAsync(args)) - { - await tcs.Task.TimeoutAfter(TestSettings.PassingTestTimeout); - } - Assert.Equal(SocketFlags.None, args.SocketFlags); - Assert.Equal(1, receiveBufer[0]); - Assert.Equal(1, receiveBufer[499]); - receiveBufer[0] = receiveBufer[499] = 0; - - // Set buffer smaller than message. - tcs = new TaskCompletionSource(); - args.SetBuffer(receiveBufer, 0, 100); - if (receiver.ReceiveAsync(args)) - { - await tcs.Task.TimeoutAfter(TestSettings.PassingTestTimeout); - } - Assert.Equal(SocketFlags.Truncated, args.SocketFlags); - Assert.Equal(2, receiveBufer[0]); - - // There should be no more data. - Assert.Equal(0, receiver.Available); - } - } - } - - public sealed class SendReceiveUdpClient : MemberDatas - { - [OuterLoop] - [Theory] - [MemberData(nameof(Loopbacks))] - public async Task SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress loopbackAddress) - { - IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress; - - const int DatagramSize = 256; - const int DatagramsToSend = 256; - const int AckTimeout = 20000; - const int TestTimeout = 60000; - - using (var left = new UdpClient(new IPEndPoint(leftAddress, 0))) - using (var right = new UdpClient(new IPEndPoint(rightAddress, 0))) - { - var leftEndpoint = (IPEndPoint)left.Client.LocalEndPoint; - var rightEndpoint = (IPEndPoint)right.Client.LocalEndPoint; - - var receiverAck = new ManualResetEventSlim(); - var senderAck = new ManualResetEventSlim(); - - var receivedChecksums = new uint?[DatagramsToSend]; - int receivedDatagrams = 0; - - Task receiverTask = Task.Run(async () => - { - for (; receivedDatagrams < DatagramsToSend; receivedDatagrams++) - { - UdpReceiveResult result = await left.ReceiveAsync(); - - receiverAck.Set(); - Assert.True(senderAck.Wait(AckTimeout)); - senderAck.Reset(); - - Assert.Equal(DatagramSize, result.Buffer.Length); - Assert.Equal(rightEndpoint, result.RemoteEndPoint); - - int datagramId = (int)result.Buffer[0]; - Assert.Null(receivedChecksums[datagramId]); - - receivedChecksums[datagramId] = Fletcher32.Checksum(result.Buffer, 0, result.Buffer.Length); - } - }); - - var sentChecksums = new uint[DatagramsToSend]; - int sentDatagrams = 0; - - Task senderTask = Task.Run(async () => - { - var random = new Random(); - var sendBuffer = new byte[DatagramSize]; - - for (; sentDatagrams < DatagramsToSend; sentDatagrams++) - { - random.NextBytes(sendBuffer); - sendBuffer[0] = (byte)sentDatagrams; - - int sent = await right.SendAsync(sendBuffer, DatagramSize, leftEndpoint); - - Assert.True(receiverAck.Wait(AckTimeout)); - receiverAck.Reset(); - senderAck.Set(); - - Assert.Equal(DatagramSize, sent); - sentChecksums[sentDatagrams] = Fletcher32.Checksum(sendBuffer, 0, sent); - } - }); - - await (new[] { receiverTask, senderTask }).WhenAllOrAnyFailed(TestTimeout); - for (int i = 0; i < DatagramsToSend; i++) - { - Assert.NotNull(receivedChecksums[i]); - Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]); - } - } - } - } - - public sealed class SendReceiveListener : MemberDatas - { - [OuterLoop] - [Theory] - [MemberData(nameof(Loopbacks))] - public async Task SendRecvAsync_TcpListener_TcpClient(IPAddress listenAt) - { - const int BytesToSend = 123456; - const int ListenBacklog = 1; - const int LingerTime = 10; - const int TestTimeout = 30000; - - var listener = new TcpListener(listenAt, 0); - listener.Start(ListenBacklog); - - int bytesReceived = 0; - var receivedChecksum = new Fletcher32(); - Task serverTask = Task.Run(async () => - { - using (TcpClient remote = await listener.AcceptTcpClientAsync()) - using (NetworkStream stream = remote.GetStream()) - { - var recvBuffer = new byte[256]; - while (true) - { - int received = await stream.ReadAsync(recvBuffer, 0, recvBuffer.Length); - if (received == 0) - { - break; - } - - bytesReceived += received; - receivedChecksum.Add(recvBuffer, 0, received); - } - } - }); - - int bytesSent = 0; - var sentChecksum = new Fletcher32(); - Task clientTask = Task.Run(async () => - { - var clientEndpoint = (IPEndPoint)listener.LocalEndpoint; - - using (var client = new TcpClient(clientEndpoint.AddressFamily)) - { - await client.ConnectAsync(clientEndpoint.Address, clientEndpoint.Port); - - using (NetworkStream stream = client.GetStream()) - { - var random = new Random(); - var sendBuffer = new byte[512]; - for (int remaining = BytesToSend, sent = 0; remaining > 0; remaining -= sent) - { - random.NextBytes(sendBuffer); - - sent = Math.Min(sendBuffer.Length, remaining); - await stream.WriteAsync(sendBuffer, 0, sent); - - bytesSent += sent; - sentChecksum.Add(sendBuffer, 0, sent); - } - - client.LingerState = new LingerOption(true, LingerTime); - } - } - }); - - await (new[] { serverTask, clientTask }).WhenAllOrAnyFailed(TestTimeout); - - Assert.Equal(bytesSent, bytesReceived); - Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); - } - } - - public sealed class SendReceiveSync : SendReceive + public sealed class SendReceive_Sync : SendReceive { - public SendReceiveSync(ITestOutputHelper output) : base(output) { } + public SendReceive_Sync(ITestOutputHelper output) : base(output) { } [OuterLoop] [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] @@ -1641,39 +1125,39 @@ select Task.Factory.StartNew(() => pair.Item1.Receive(new byte[1]), Cancellation } } - public sealed class SendReceiveSyncForceNonBlocking : SendReceive + public sealed class SendReceive_SyncForceNonBlocking : SendReceive { - public SendReceiveSyncForceNonBlocking(ITestOutputHelper output) : base(output) {} + public SendReceive_SyncForceNonBlocking(ITestOutputHelper output) : base(output) {} } - public sealed class SendReceiveApm : SendReceive + public sealed class SendReceive_Apm : SendReceive { - public SendReceiveApm(ITestOutputHelper output) : base(output) {} + public SendReceive_Apm(ITestOutputHelper output) : base(output) {} } - public sealed class SendReceiveTask : SendReceive + public sealed class SendReceive_Task : SendReceive { - public SendReceiveTask(ITestOutputHelper output) : base(output) {} + public SendReceive_Task(ITestOutputHelper output) : base(output) {} } - public sealed class SendReceiveEap : SendReceive + public sealed class SendReceive_Eap : SendReceive { - public SendReceiveEap(ITestOutputHelper output) : base(output) {} + public SendReceive_Eap(ITestOutputHelper output) : base(output) {} } - public sealed class SendReceiveSpanSync : SendReceive + public sealed class SendReceive_SpanSync : SendReceive { - public SendReceiveSpanSync(ITestOutputHelper output) : base(output) { } + public SendReceive_SpanSync(ITestOutputHelper output) : base(output) { } } - public sealed class SendReceiveSpanSyncForceNonBlocking : SendReceive + public sealed class SendReceive_SpanSyncForceNonBlocking : SendReceive { - public SendReceiveSpanSyncForceNonBlocking(ITestOutputHelper output) : base(output) { } + public SendReceive_SpanSyncForceNonBlocking(ITestOutputHelper output) : base(output) { } } - public sealed class SendReceiveMemoryArrayTask : SendReceive + public sealed class SendReceive_MemoryArrayTask : SendReceive { - public SendReceiveMemoryArrayTask(ITestOutputHelper output) : base(output) { } + public SendReceive_MemoryArrayTask(ITestOutputHelper output) : base(output) { } [Fact] public async Task Precanceled_Throws() @@ -1859,8 +1343,8 @@ await Task.Run(async delegate // escape the xunit sync context / task scheduler } } - public sealed class SendReceiveMemoryNativeTask : SendReceive + public sealed class SendReceive_MemoryNativeTask : SendReceive { - public SendReceiveMemoryNativeTask(ITestOutputHelper output) : base(output) { } + public SendReceive_MemoryNativeTask(ITestOutputHelper output) : base(output) { } } } diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveMisc.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveMisc.cs new file mode 100644 index 0000000000000..4fa40e5d7e9c6 --- /dev/null +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveMisc.cs @@ -0,0 +1,286 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.Sockets.Tests +{ + public class SendReceiveMisc + { + [Fact] + public void SendRecvIovMaxTcp_Success() + { + // sending/receiving more than IOV_MAX segments causes EMSGSIZE on some platforms. + // This is handled internally for stream sockets so this error shouldn't surface. + + // Use more than IOV_MAX (1024 on Linux & macOS) segments. + const int SegmentCount = 2400; + using (var server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + server.BindToAnonymousPort(IPAddress.Loopback); + server.Listen(1); + + var sendBuffer = new byte[SegmentCount]; + Task serverProcessingTask = Task.Run(() => + { + using (Socket acceptSocket = server.Accept()) + { + // send data as SegmentCount (> IOV_MAX) 1-byte segments. + var sendSegments = new List>(); + for (int i = 0; i < SegmentCount; i++) + { + sendBuffer[i] = (byte)i; + sendSegments.Add(new ArraySegment(sendBuffer, i, 1)); + } + SocketError error; + // Send blocks until all segments are sent. + int bytesSent = acceptSocket.Send(sendSegments, SocketFlags.None, out error); + + Assert.Equal(SegmentCount, bytesSent); + Assert.Equal(SocketError.Success, error); + + acceptSocket.Shutdown(SocketShutdown.Send); + } + }); + + using (var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + client.Connect(server.LocalEndPoint); + + // receive data as 1-byte segments. + var receiveBuffer = new byte[SegmentCount]; + var receiveSegments = new List>(); + for (int i = 0; i < SegmentCount; i++) + { + receiveSegments.Add(new ArraySegment(receiveBuffer, i, 1)); + } + var bytesReceivedTotal = 0; + do + { + SocketError error; + // Receive can return up to IOV_MAX segments. + int bytesReceived = client.Receive(receiveSegments, SocketFlags.None, out error); + bytesReceivedTotal += bytesReceived; + // Offset receiveSegments for next Receive. + receiveSegments.RemoveRange(0, bytesReceived); + + Assert.NotEqual(0, bytesReceived); + Assert.Equal(SocketError.Success, error); + } while (bytesReceivedTotal != SegmentCount); + + AssertExtensions.Equal(sendBuffer, receiveBuffer); + } + } + } + + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] + public void SendIovMaxUdp_SuccessOrMessageSize() + { + // sending more than IOV_MAX segments causes EMSGSIZE on some platforms. + // We handle this for stream sockets by truncating. + // This test verifies we are not truncating non-stream sockets. + + // Use more than IOV_MAX (1024 on Linux & macOS) segments + // and less than Ethernet MTU. + const int SegmentCount = 1200; + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)) + { + socket.BindToAnonymousPort(IPAddress.Loopback); + // Use our own address as destination. + socket.Connect(socket.LocalEndPoint); + + var sendBuffer = new byte[SegmentCount]; + var sendSegments = new List>(); + for (int i = 0; i < SegmentCount; i++) + { + sendBuffer[i] = (byte)i; + sendSegments.Add(new ArraySegment(sendBuffer, i, 1)); + } + + SocketError error; + // send data as SegmentCount (> IOV_MAX) 1-byte segments. + int bytesSent = socket.Send(sendSegments, SocketFlags.None, out error); + if (error == SocketError.Success) + { + // platform sent message with > IOV_MAX segments + Assert.Equal(SegmentCount, bytesSent); + } + else + { + // platform returns EMSGSIZE + Assert.Equal(SocketError.MessageSize, error); + Assert.Equal(0, bytesSent); + } + } + } + + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] + public async Task ReceiveIovMaxUdp_SuccessOrMessageSize() + { + // receiving more than IOV_MAX segments causes EMSGSIZE on some platforms. + // We handle this for stream sockets by truncating. + // This test verifies we are not truncating non-stream sockets. + + // Use more than IOV_MAX (1024 on Linux & macOS) segments + // and less than Ethernet MTU. + const int SegmentCount = 1200; + var sender = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + sender.BindToAnonymousPort(IPAddress.Loopback); + var receiver = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + receiver.Connect(sender.LocalEndPoint); // only receive from sender + EndPoint receiverEndPoint = receiver.LocalEndPoint; + + Barrier b = new Barrier(2); + + Task receiveTask = Task.Run(() => + { + using (receiver) + { + var receiveBuffer = new byte[SegmentCount]; + var receiveSegments = new List>(); + for (int i = 0; i < SegmentCount; i++) + { + receiveSegments.Add(new ArraySegment(receiveBuffer, i, 1)); + } + // receive data as SegmentCount (> IOV_MAX) 1-byte segments. + SocketError error; + // Signal we are ready to receive. + b.SignalAndWait(); + int bytesReceived = receiver.Receive(receiveSegments, SocketFlags.None, out error); + + if (error == SocketError.Success) + { + // platform received message in > IOV_MAX segments + Assert.Equal(SegmentCount, bytesReceived); + } + else + { + // platform returns EMSGSIZE + Assert.Equal(SocketError.MessageSize, error); + Assert.Equal(0, bytesReceived); + } + } + }); + + using (sender) + { + sender.Connect(receiverEndPoint); + + // Synchronize and wait for receiving task to be ready. + b.SignalAndWait(); + + var sendBuffer = new byte[SegmentCount]; + for (int i = 0; i < 10; i++) // UDPRedundancy + { + int bytesSent = sender.Send(sendBuffer); + Assert.Equal(SegmentCount, bytesSent); + await Task.WhenAny(receiveTask, Task.Delay(3)); + if (receiveTask.IsCompleted) + { + break; + } + } + } + + Assert.True(receiveTask.IsCompleted); + await receiveTask; + } + + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] + [PlatformSpecific(~TestPlatforms.Windows)] // All data is sent, even when very large (100M). + public void SocketSendWouldBlock_ReturnsBytesSent() + { + using (var server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + using (var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + // listen + server.BindToAnonymousPort(IPAddress.Loopback); + server.Listen(1); + // connect + client.Connect(server.LocalEndPoint); + // accept + using (Socket socket = server.Accept()) + { + // We send a large amount of data but don't read it. + // A chunck will be sent, attempts to send more will return SocketError.WouldBlock. + // Socket.Send must return the success of the partial send. + socket.Blocking = false; + var data = new byte[5_000_000]; + SocketError error; + int bytesSent = socket.Send(data, 0, data.Length, SocketFlags.None, out error); + + Assert.Equal(SocketError.Success, error); + Assert.InRange(bytesSent, 1, data.Length - 1); + } + } + } + + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsSubsystemForLinux))] // [ActiveIssue("https://github.com/dotnet/runtime/issues/18258")] + [PlatformSpecific(TestPlatforms.AnyUnix)] + public async Task Socket_ReceiveFlags_Success() + { + using (var sender = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)) + using (var receiver = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)) + { + receiver.BindToAnonymousPort(IPAddress.Loopback); + sender.Connect(receiver.LocalEndPoint); + sender.SendBufferSize = 1500; + + var data = new byte[500]; + data[0] = data[499] = 1; + + Assert.Equal(500, sender.Send(data)); + data[0] = data[499] = 2; + Assert.Equal(500, sender.Send(data)); + + var tcs = new TaskCompletionSource(); + SocketAsyncEventArgs args = new SocketAsyncEventArgs(); + + var receiveBufer = new byte[600]; + receiveBufer[0] = data[499] = 0; + + args.SetBuffer(receiveBufer, 0, receiveBufer.Length); + args.Completed += delegate { tcs.SetResult(); }; + + // First peek at the message. + args.SocketFlags = SocketFlags.Peek; + if (receiver.ReceiveAsync(args)) + { + await tcs.Task.TimeoutAfter(TestSettings.PassingTestTimeout); + } + Assert.Equal(SocketFlags.None, args.SocketFlags); + Assert.Equal(1, receiveBufer[0]); + Assert.Equal(1, receiveBufer[499]); + receiveBufer[0] = receiveBufer[499] = 0; + + // Now, we should be able to get same message again. + tcs = new TaskCompletionSource(); + args.SocketFlags = SocketFlags.None; + if (receiver.ReceiveAsync(args)) + { + await tcs.Task.TimeoutAfter(TestSettings.PassingTestTimeout); + } + Assert.Equal(SocketFlags.None, args.SocketFlags); + Assert.Equal(1, receiveBufer[0]); + Assert.Equal(1, receiveBufer[499]); + receiveBufer[0] = receiveBufer[499] = 0; + + // Set buffer smaller than message. + tcs = new TaskCompletionSource(); + args.SetBuffer(receiveBufer, 0, 100); + if (receiver.ReceiveAsync(args)) + { + await tcs.Task.TimeoutAfter(TestSettings.PassingTestTimeout); + } + Assert.Equal(SocketFlags.Truncated, args.SocketFlags); + Assert.Equal(2, receiveBufer[0]); + + // There should be no more data. + Assert.Equal(0, receiver.Available); + } + } + } +} diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveNonParallel.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveNonParallel.cs new file mode 100644 index 0000000000000..02cb9a09abd98 --- /dev/null +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveNonParallel.cs @@ -0,0 +1,147 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace System.Net.Sockets.Tests +{ + [Collection(nameof(NoParallelTests))] + public abstract class SendReceiveNonParallel : SocketTestHelperBase where T : SocketHelperBase, new() + { + public SendReceiveNonParallel(ITestOutputHelper output) : base(output) { } + + public static IEnumerable LoopbackWithBool => + from addr in Loopbacks + from b in new[] { false, true } + select new object[] { addr[0], b }; + + [OuterLoop("Serial execution of all variants takes long")] + [Theory] + [MemberData(nameof(LoopbackWithBool))] + public async Task SendToRecvFrom_Datagram_UDP(IPAddress loopbackAddress, bool useClone) + { + IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress; + + const int DatagramSize = 256; + const int DatagramsToSend = 256; + const int ReceiverAckTimeout = 5000; + const int SenderAckTimeout = 10000; + + using var origLeft = new Socket(leftAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + using var origRight = new Socket(rightAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + origLeft.BindToAnonymousPort(leftAddress); + origRight.BindToAnonymousPort(rightAddress); + + using var left = useClone ? new Socket(origLeft.SafeHandle) : origLeft; + using var right = useClone ? new Socket(origRight.SafeHandle) : origRight; + + var leftEndpoint = (IPEndPoint)left.LocalEndPoint; + var rightEndpoint = (IPEndPoint)right.LocalEndPoint; + + var receiverAck = new SemaphoreSlim(0); + var senderAck = new SemaphoreSlim(0); + + _output.WriteLine($"{DateTime.Now}: Sending data from {rightEndpoint} to {leftEndpoint}"); + + var receivedChecksums = new uint?[DatagramsToSend]; + Task leftThread = Task.Run(async () => + { + EndPoint remote = leftEndpoint.Create(leftEndpoint.Serialize()); + var recvBuffer = new byte[DatagramSize]; + for (int i = 0; i < DatagramsToSend; i++) + { + SocketReceiveFromResult result = await ReceiveFromAsync( + left, new ArraySegment(recvBuffer), remote); + Assert.Equal(DatagramSize, result.ReceivedBytes); + Assert.Equal(rightEndpoint, result.RemoteEndPoint); + + int datagramId = recvBuffer[0]; + Assert.Null(receivedChecksums[datagramId]); + receivedChecksums[datagramId] = Fletcher32.Checksum(recvBuffer, 0, result.ReceivedBytes); + + receiverAck.Release(); + bool gotAck = await senderAck.WaitAsync(SenderAckTimeout); + Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {SenderAckTimeout} for senderAck in iteration {i}"); + } + }); + + var sentChecksums = new uint[DatagramsToSend]; + using (right) + { + var random = new Random(); + var sendBuffer = new byte[DatagramSize]; + for (int i = 0; i < DatagramsToSend; i++) + { + random.NextBytes(sendBuffer); + sendBuffer[0] = (byte)i; + + int sent = await SendToAsync(right, new ArraySegment(sendBuffer), leftEndpoint); + + bool gotAck = await receiverAck.WaitAsync(ReceiverAckTimeout); + Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {ReceiverAckTimeout} for receiverAck in iteration {i} after sending {sent}. Receiver is in {leftThread.Status}"); + senderAck.Release(); + + Assert.Equal(DatagramSize, sent); + sentChecksums[i] = Fletcher32.Checksum(sendBuffer, 0, sent); + } + } + + await leftThread; + for (int i = 0; i < DatagramsToSend; i++) + { + Assert.NotNull(receivedChecksums[i]); + Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]); + } + } + } + + public sealed class SendReceiveNonParallel_Sync : SendReceiveNonParallel + { + public SendReceiveNonParallel_Sync(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_SyncForceNonBlocking : SendReceiveNonParallel + { + public SendReceiveNonParallel_SyncForceNonBlocking(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_Apm : SendReceiveNonParallel + { + public SendReceiveNonParallel_Apm(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_Task : SendReceiveNonParallel + { + public SendReceiveNonParallel_Task(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_Eap : SendReceiveNonParallel + { + public SendReceiveNonParallel_Eap(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_SpanSync : SendReceiveNonParallel + { + public SendReceiveNonParallel_SpanSync(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_SpanSyncForceNonBlocking : SendReceiveNonParallel + { + public SendReceiveNonParallel_SpanSyncForceNonBlocking(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_MemoryArrayTask : SendReceiveNonParallel + { + public SendReceiveNonParallel_MemoryArrayTask(ITestOutputHelper output) : base(output) { } + } + + public sealed class SendReceiveNonParallel_MemoryNativeTask : SendReceiveNonParallel + { + public SendReceiveNonParallel_MemoryNativeTask(ITestOutputHelper output) : base(output) { } + } +} diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveTcpClient.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveTcpClient.cs new file mode 100644 index 0000000000000..503055d98ab09 --- /dev/null +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveTcpClient.cs @@ -0,0 +1,82 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.Sockets.Tests +{ + public sealed class SendReceiveTcpClient : MemberDatas + { + [OuterLoop] + [Theory] + [MemberData(nameof(Loopbacks))] + public async Task SendRecvAsync_TcpListener_TcpClient(IPAddress listenAt) + { + const int BytesToSend = 123456; + const int ListenBacklog = 1; + const int LingerTime = 10; + const int TestTimeout = 30000; + + var listener = new TcpListener(listenAt, 0); + listener.Start(ListenBacklog); + + int bytesReceived = 0; + var receivedChecksum = new Fletcher32(); + Task serverTask = Task.Run(async () => + { + using (TcpClient remote = await listener.AcceptTcpClientAsync()) + using (NetworkStream stream = remote.GetStream()) + { + var recvBuffer = new byte[256]; + while (true) + { + int received = await stream.ReadAsync(recvBuffer, 0, recvBuffer.Length); + if (received == 0) + { + break; + } + + bytesReceived += received; + receivedChecksum.Add(recvBuffer, 0, received); + } + } + }); + + int bytesSent = 0; + var sentChecksum = new Fletcher32(); + Task clientTask = Task.Run(async () => + { + var clientEndpoint = (IPEndPoint)listener.LocalEndpoint; + + using (var client = new TcpClient(clientEndpoint.AddressFamily)) + { + await client.ConnectAsync(clientEndpoint.Address, clientEndpoint.Port); + + using (NetworkStream stream = client.GetStream()) + { + var random = new Random(); + var sendBuffer = new byte[512]; + for (int remaining = BytesToSend, sent = 0; remaining > 0; remaining -= sent) + { + random.NextBytes(sendBuffer); + + sent = Math.Min(sendBuffer.Length, remaining); + await stream.WriteAsync(sendBuffer, 0, sent); + + bytesSent += sent; + sentChecksum.Add(sendBuffer, 0, sent); + } + + client.LingerState = new LingerOption(true, LingerTime); + } + } + }); + + await (new[] { serverTask, clientTask }).WhenAllOrAnyFailed(TestTimeout); + + Assert.Equal(bytesSent, bytesReceived); + Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); + } + } +} diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveUdpClient.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveUdpClient.cs new file mode 100644 index 0000000000000..ce83af9bb677d --- /dev/null +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/SendReceive/SendReceiveUdpClient.cs @@ -0,0 +1,89 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.Sockets.Tests +{ + public sealed class SendReceiveUdpClient : MemberDatas + { + [OuterLoop] + [Theory] + [MemberData(nameof(Loopbacks))] + public async Task SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress loopbackAddress) + { + IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress; + + const int DatagramSize = 256; + const int DatagramsToSend = 256; + const int AckTimeout = 20000; + const int TestTimeout = 60000; + + using (var left = new UdpClient(new IPEndPoint(leftAddress, 0))) + using (var right = new UdpClient(new IPEndPoint(rightAddress, 0))) + { + var leftEndpoint = (IPEndPoint)left.Client.LocalEndPoint; + var rightEndpoint = (IPEndPoint)right.Client.LocalEndPoint; + + var receiverAck = new ManualResetEventSlim(); + var senderAck = new ManualResetEventSlim(); + + var receivedChecksums = new uint?[DatagramsToSend]; + int receivedDatagrams = 0; + + Task receiverTask = Task.Run(async () => + { + for (; receivedDatagrams < DatagramsToSend; receivedDatagrams++) + { + UdpReceiveResult result = await left.ReceiveAsync(); + + receiverAck.Set(); + Assert.True(senderAck.Wait(AckTimeout)); + senderAck.Reset(); + + Assert.Equal(DatagramSize, result.Buffer.Length); + Assert.Equal(rightEndpoint, result.RemoteEndPoint); + + int datagramId = (int)result.Buffer[0]; + Assert.Null(receivedChecksums[datagramId]); + + receivedChecksums[datagramId] = Fletcher32.Checksum(result.Buffer, 0, result.Buffer.Length); + } + }); + + var sentChecksums = new uint[DatagramsToSend]; + int sentDatagrams = 0; + + Task senderTask = Task.Run(async () => + { + var random = new Random(); + var sendBuffer = new byte[DatagramSize]; + + for (; sentDatagrams < DatagramsToSend; sentDatagrams++) + { + random.NextBytes(sendBuffer); + sendBuffer[0] = (byte)sentDatagrams; + + int sent = await right.SendAsync(sendBuffer, DatagramSize, leftEndpoint); + + Assert.True(receiverAck.Wait(AckTimeout)); + receiverAck.Reset(); + senderAck.Set(); + + Assert.Equal(DatagramSize, sent); + sentChecksums[sentDatagrams] = Fletcher32.Checksum(sendBuffer, 0, sent); + } + }); + + await (new[] { receiverTask, senderTask }).WhenAllOrAnyFailed(TestTimeout); + for (int i = 0; i < DatagramsToSend; i++) + { + Assert.NotNull(receivedChecksums[i]); + Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]); + } + } + } + } +} diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj b/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj index fee81761de6fe..e2522c8009bcb 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/System.Net.Sockets.Tests.csproj @@ -34,7 +34,11 @@ - + + + + + diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs index 144557c77636e..d007ab14bfa44 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs @@ -349,17 +349,17 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn { // Invoke several tests to execute code paths while tracing is enabled - await new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); - await new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); + await new SendReceive_Sync(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); + await new SendReceive_Sync(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); - await new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); - await new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); + await new SendReceive_Task(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); + await new SendReceive_Task(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); - await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); - await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); + await new SendReceive_Eap(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); + await new SendReceive_Eap(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); - await new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); - await new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); + await new SendReceive_Apm(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false); + await new SendReceive_Apm(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false); await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback).ConfigureAwait(false); await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback).ConfigureAwait(false);