diff --git a/src/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs b/src/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs index fd60f962e8d5..4a032e80bc7b 100644 --- a/src/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs +++ b/src/System.Net.Sockets/tests/FunctionalTests/SendReceive.cs @@ -11,7 +11,7 @@ namespace System.Net.Sockets.Tests { - public class SendReceive + public abstract class SendReceive : MemberDatas { private readonly ITestOutputHelper _log; @@ -20,11 +20,23 @@ public SendReceive(ITestOutputHelper output) _log = output; } + public abstract Task AcceptAsync(Socket s); + public abstract Task ConnectAsync(Socket s, EndPoint endPoint); + public abstract Task ReceiveAsync(Socket s, ArraySegment buffer); + public abstract Task ReceiveFromAsync( + Socket s, ArraySegment buffer, EndPoint endPoint); + public abstract Task ReceiveAsync(Socket s, IList> bufferList); + public abstract Task SendAsync(Socket s, ArraySegment buffer); + public abstract Task SendAsync(Socket s, IList> bufferList); + public abstract Task SendToAsync(Socket s, ArraySegment buffer, EndPoint endpoint); + [OuterLoop] // TODO: Issue #11345 [Theory] - [MemberData(nameof(LoopbacksToSameLoopback))] - public static void SendToRecvFrom_Datagram_UDP(IPAddress leftAddress, IPAddress rightAddress) + [MemberData(nameof(Loopbacks))] + public async Task SendToRecvFrom_Datagram_UDP(IPAddress loopbackAddress) { + IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress; + // TODO #5185: Harden against packet loss const int DatagramSize = 256; const int DatagramsToSend = 256; @@ -40,11 +52,11 @@ public static void SendToRecvFrom_Datagram_UDP(IPAddress leftAddress, IPAddress var leftEndpoint = (IPEndPoint)left.LocalEndPoint; var rightEndpoint = (IPEndPoint)right.LocalEndPoint; - var receiverAck = new ManualResetEventSlim(); - var senderAck = new ManualResetEventSlim(); + var receiverAck = new SemaphoreSlim(0); + var senderAck = new SemaphoreSlim(0); var receivedChecksums = new uint?[DatagramsToSend]; - var leftThread = new Thread(() => + Task leftThread = Task.Run(async () => { using (left) { @@ -52,23 +64,21 @@ public static void SendToRecvFrom_Datagram_UDP(IPAddress leftAddress, IPAddress var recvBuffer = new byte[DatagramSize]; for (int i = 0; i < DatagramsToSend; i++) { - int received = left.ReceiveFrom(recvBuffer, SocketFlags.None, ref remote); - Assert.Equal(DatagramSize, received); - Assert.Equal(rightEndpoint, remote); + SocketReceiveFromResult result = await ReceiveFromAsync( + left, new ArraySegment(recvBuffer), remote); + Assert.Equal(DatagramSize, result.ReceivedBytes); + Assert.Equal(rightEndpoint, result.RemoteEndPoint); - int datagramId = (int)recvBuffer[0]; + int datagramId = recvBuffer[0]; Assert.Null(receivedChecksums[datagramId]); - receivedChecksums[datagramId] = Fletcher32.Checksum(recvBuffer, 0, received); + receivedChecksums[datagramId] = Fletcher32.Checksum(recvBuffer, 0, result.ReceivedBytes); - receiverAck.Set(); - Assert.True(senderAck.Wait(AckTimeout)); - senderAck.Reset(); + receiverAck.Release(); + Assert.True(await senderAck.WaitAsync(TestTimeout)); } } }); - leftThread.Start(); - var sentChecksums = new uint[DatagramsToSend]; using (right) { @@ -79,18 +89,17 @@ public static void SendToRecvFrom_Datagram_UDP(IPAddress leftAddress, IPAddress random.NextBytes(sendBuffer); sendBuffer[0] = (byte)i; - int sent = right.SendTo(sendBuffer, SocketFlags.None, leftEndpoint); + int sent = await SendToAsync(right, new ArraySegment(sendBuffer), leftEndpoint); - Assert.True(receiverAck.Wait(AckTimeout)); - receiverAck.Reset(); - senderAck.Set(); + Assert.True(await receiverAck.WaitAsync(AckTimeout)); + senderAck.Release(); Assert.Equal(DatagramSize, sent); sentChecksums[i] = Fletcher32.Checksum(sendBuffer, 0, sent); } } - Assert.True(leftThread.Join(TestTimeout)); + await leftThread; for (int i = 0; i < DatagramsToSend; i++) { Assert.NotNull(receivedChecksums[i]); @@ -98,163 +107,30 @@ public static void SendToRecvFrom_Datagram_UDP(IPAddress leftAddress, IPAddress } } - [OuterLoop] // TODO: Issue #11345 - [Theory] - [MemberData(nameof(LoopbacksToSameLoopback))] - public static void SendToRecvFromAPM_Datagram_UDP(IPAddress leftAddress, IPAddress rightAddress) - { - // TODO #5185: Harden against packet loss - const int DatagramSize = 256; - const int DatagramsToSend = 256; - const int AckTimeout = 1000; - const int TestTimeout = 30000; - - var left = new Socket(leftAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - left.BindToAnonymousPort(leftAddress); - - var right = new Socket(rightAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - right.BindToAnonymousPort(rightAddress); - - var leftEndpoint = (IPEndPoint)left.LocalEndPoint; - var rightEndpoint = (IPEndPoint)right.LocalEndPoint; - - var receiverAck = new ManualResetEventSlim(); - var senderAck = new ManualResetEventSlim(); - - EndPoint receiveRemote = leftEndpoint.Create(leftEndpoint.Serialize()); - var receiverFinished = new TaskCompletionSource(); - var receivedChecksums = new uint?[DatagramsToSend]; - var receiveBuffer = new byte[DatagramSize]; - int receivedDatagrams = -1; - - Action receiveHandler = null; - receiveHandler = (received, remote) => - { - try - { - if (receivedDatagrams != -1) - { - Assert.Equal(DatagramSize, received); - Assert.Equal(rightEndpoint, remote); - - int datagramId = (int)receiveBuffer[0]; - Assert.Null(receivedChecksums[datagramId]); - receivedChecksums[datagramId] = Fletcher32.Checksum(receiveBuffer, 0, received); - - receiverAck.Set(); - Assert.True(senderAck.Wait(AckTimeout)); - senderAck.Reset(); - - receivedDatagrams++; - if (receivedDatagrams == DatagramsToSend) - { - left.Dispose(); - receiverFinished.SetResult(true); - return; - } - } - else - { - receivedDatagrams = 0; - } - - left.ReceiveFromAPM(receiveBuffer, 0, receiveBuffer.Length, SocketFlags.None, receiveRemote, receiveHandler); - } - catch (Exception ex) - { - receiverFinished.SetException(ex); - } - }; - - receiveHandler(0, null); - - var random = new Random(); - var senderFinished = new TaskCompletionSource(); - var sentChecksums = new uint[DatagramsToSend]; - var sendBuffer = new byte[DatagramSize]; - int sentDatagrams = -1; - - Action sendHandler = null; - sendHandler = sent => - { - try - { - if (sentDatagrams != -1) - { - Assert.True(receiverAck.Wait(AckTimeout)); - receiverAck.Reset(); - senderAck.Set(); - - Assert.Equal(DatagramSize, sent); - sentChecksums[sentDatagrams] = Fletcher32.Checksum(sendBuffer, 0, sent); - - sentDatagrams++; - if (sentDatagrams == DatagramsToSend) - { - right.Dispose(); - senderFinished.SetResult(true); - return; - } - } - else - { - sentDatagrams = 0; - } - - random.NextBytes(sendBuffer); - sendBuffer[0] = (byte)sentDatagrams; - right.SendToAPM(sendBuffer, 0, sendBuffer.Length, SocketFlags.None, leftEndpoint, sendHandler); - } - catch (Exception ex) - { - senderFinished.SetException(ex); - } - }; - - sendHandler(0); - - Assert.True(receiverFinished.Task.Wait(TestTimeout)); - Assert.True(senderFinished.Task.Wait(TestTimeout)); - - for (int i = 0; i < DatagramsToSend; i++) - { - Assert.NotNull(receivedChecksums[i]); - Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]); - } - } - [OuterLoop] // TODO: Issue #11345 [Theory] [MemberData(nameof(LoopbacksAndBuffers))] - public static void SendRecv_Stream_TCP(IPAddress listenAt, bool useMultipleBuffers) + public async Task SendRecv_Stream_TCP(IPAddress listenAt, bool useMultipleBuffers) { - const int BytesToSend = 123456; - const int ListenBacklog = 1; - const int LingerTime = 10; - const int TestTimeout = 30000; - - var server = new Socket(listenAt.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - server.BindToAnonymousPort(listenAt); + const int BytesToSend = 123456, ListenBacklog = 1, LingerTime = 1; + int bytesReceived = 0, bytesSent = 0; + Fletcher32 receivedChecksum = new Fletcher32(), sentChecksum = new Fletcher32(); - server.Listen(ListenBacklog); - - int bytesReceived = 0; - var receivedChecksum = new Fletcher32(); - var serverThread = new Thread(() => + using (var server = new Socket(listenAt.AddressFamily, SocketType.Stream, ProtocolType.Tcp)) { - using (server) - { - Socket remote = server.Accept(); - Assert.NotNull(remote); + server.BindToAnonymousPort(listenAt); + server.Listen(ListenBacklog); - using (remote) + Task serverProcessingTask = Task.Run(async () => + { + using (Socket remote = await AcceptAsync(server)) { if (!useMultipleBuffers) { var recvBuffer = new byte[256]; for (;;) { - int received = remote.Receive(recvBuffer, 0, recvBuffer.Length, SocketFlags.None); + int received = await ReceiveAsync(remote, new ArraySegment(recvBuffer)); if (received == 0) { break; @@ -270,12 +146,10 @@ public static void SendRecv_Stream_TCP(IPAddress listenAt, bool useMultipleBuffe new ArraySegment(new byte[123]), new ArraySegment(new byte[256], 2, 100), new ArraySegment(new byte[1], 0, 0), - new ArraySegment(new byte[64], 9, 33) - }; - + new ArraySegment(new byte[64], 9, 33)}; for (;;) { - int received = remote.Receive(recvBuffers, SocketFlags.None); + int received = await ReceiveAsync(remote, recvBuffers); if (received == 0) { break; @@ -290,389 +164,197 @@ public static void SendRecv_Stream_TCP(IPAddress listenAt, bool useMultipleBuffe remaining -= toAdd; } } + } } - } - }); - serverThread.Start(); - - EndPoint clientEndpoint = server.LocalEndPoint; - var client = new Socket(clientEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - client.Connect(clientEndpoint); - - int bytesSent = 0; - var sentChecksum = new Fletcher32(); - using (client) - { - var random = new Random(); + }); - if (!useMultipleBuffers) + EndPoint clientEndpoint = server.LocalEndPoint; + using (var client = new Socket(clientEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)) { - var sendBuffer = new byte[512]; - for (int sent = 0, remaining = BytesToSend; remaining > 0; remaining -= sent) - { - random.NextBytes(sendBuffer); + await ConnectAsync(client, clientEndpoint); - sent = client.Send(sendBuffer, 0, Math.Min(sendBuffer.Length, remaining), SocketFlags.None); - bytesSent += sent; - sentChecksum.Add(sendBuffer, 0, sent); + var random = new Random(); + if (!useMultipleBuffers) + { + var sendBuffer = new byte[512]; + for (int sent = 0, remaining = BytesToSend; remaining > 0; remaining -= sent) + { + random.NextBytes(sendBuffer); + sent = await SendAsync(client, new ArraySegment(sendBuffer, 0, Math.Min(sendBuffer.Length, remaining))); + bytesSent += sent; + sentChecksum.Add(sendBuffer, 0, sent); + } } - } - else - { - var sendBuffers = new List> { + else + { + var sendBuffers = new List> { new ArraySegment(new byte[23]), new ArraySegment(new byte[256], 2, 100), new ArraySegment(new byte[1], 0, 0), - new ArraySegment(new byte[64], 9, 9) - }; - - for (int sent = 0, toSend = BytesToSend; toSend > 0; toSend -= sent) - { - for (int i = 0; i < sendBuffers.Count; i++) + new ArraySegment(new byte[64], 9, 9)}; + for (int sent = 0, toSend = BytesToSend; toSend > 0; toSend -= sent) { - random.NextBytes(sendBuffers[i].Array); - } + for (int i = 0; i < sendBuffers.Count; i++) + { + random.NextBytes(sendBuffers[i].Array); + } - sent = client.Send(sendBuffers, SocketFlags.None); + sent = await SendAsync(client, sendBuffers); - bytesSent += sent; - for (int i = 0, remaining = sent; i < sendBuffers.Count && remaining > 0; i++) - { - ArraySegment buffer = sendBuffers[i]; - int toAdd = Math.Min(buffer.Count, remaining); - sentChecksum.Add(buffer.Array, buffer.Offset, toAdd); - remaining -= toAdd; + bytesSent += sent; + for (int i = 0, remaining = sent; i < sendBuffers.Count && remaining > 0; i++) + { + ArraySegment buffer = sendBuffers[i]; + int toAdd = Math.Min(buffer.Count, remaining); + sentChecksum.Add(buffer.Array, buffer.Offset, toAdd); + remaining -= toAdd; + } } } + + client.LingerState = new LingerOption(true, LingerTime); + client.Shutdown(SocketShutdown.Both); + await serverProcessingTask; } - client.LingerState = new LingerOption(true, LingerTime); + Assert.Equal(bytesSent, bytesReceived); + Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); } - - Assert.True(serverThread.Join(TestTimeout), "Completed within allowed time"); - - Assert.Equal(bytesSent, bytesReceived); - Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); } [OuterLoop] // TODO: Issue #11345 [Theory] [MemberData(nameof(LoopbacksAndBuffers))] - public static void SendRecvAPM_Stream_TCP(IPAddress listenAt, bool useMultipleBuffers) + public void SendRecvPollSync_TcpListener_Socket(IPAddress listenAt, bool pollBeforeOperation) { const int BytesToSend = 123456; const int ListenBacklog = 1; - const int LingerTime = 10; const int TestTimeout = 30000; - var server = new Socket(listenAt.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - server.BindToAnonymousPort(listenAt); - - server.Listen(ListenBacklog); - - var serverFinished = new TaskCompletionSource(); - int bytesReceived = 0; - var receivedChecksum = new Fletcher32(); - - server.AcceptAPM(remote => + var listener = new TcpListener(listenAt, 0); + listener.Start(ListenBacklog); + try { - Action recvHandler = null; - bool first = true; - - if (!useMultipleBuffers) + int bytesReceived = 0; + var receivedChecksum = new Fletcher32(); + Task serverTask = Task.Run(async () => { - var recvBuffer = new byte[256]; - recvHandler = received => + using (Socket remote = await listener.AcceptSocketAsync()) { - if (!first) + var recvBuffer = new byte[256]; + while (true) { + if (pollBeforeOperation) + { + Assert.True(remote.Poll(-1, SelectMode.SelectRead), "Read poll before completion should have succeeded"); + } + int received = remote.Receive(recvBuffer, 0, recvBuffer.Length, SocketFlags.None); if (received == 0) { - remote.Dispose(); - server.Dispose(); - serverFinished.SetResult(true); - return; + Assert.True(remote.Poll(0, SelectMode.SelectRead), "Read poll after completion should have succeeded"); + break; } bytesReceived += received; receivedChecksum.Add(recvBuffer, 0, received); } - else - { - first = false; - } + } + }); - remote.ReceiveAPM(recvBuffer, 0, recvBuffer.Length, SocketFlags.None, recvHandler); - }; - } - else + int bytesSent = 0; + var sentChecksum = new Fletcher32(); + Task clientTask = Task.Run(async () => { - var recvBuffers = new List> { - new ArraySegment(new byte[123]), - new ArraySegment(new byte[256], 2, 100), - new ArraySegment(new byte[1], 0, 0), - new ArraySegment(new byte[64], 9, 33) - }; + var clientEndpoint = (IPEndPoint)listener.LocalEndpoint; - recvHandler = received => + using (var client = new Socket(clientEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)) { - if (!first) - { - if (received == 0) - { - remote.Dispose(); - server.Dispose(); - serverFinished.SetResult(true); - return; - } + await client.ConnectAsync(clientEndpoint.Address, clientEndpoint.Port); - bytesReceived += received; - for (int i = 0, remaining = received; i < recvBuffers.Count && remaining > 0; i++) - { - ArraySegment buffer = recvBuffers[i]; - int toAdd = Math.Min(buffer.Count, remaining); - receivedChecksum.Add(buffer.Array, buffer.Offset, toAdd); - remaining -= toAdd; - } - } - else + if (pollBeforeOperation) { - first = false; + Assert.False(client.Poll(TestTimeout, SelectMode.SelectRead), "Expected writer's read poll to fail after timeout"); } - remote.ReceiveAPM(recvBuffers, SocketFlags.None, recvHandler); - }; - } - - recvHandler(0); - }); - - EndPoint clientEndpoint = server.LocalEndPoint; - var client = new Socket(clientEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - - int bytesSent = 0; - var sentChecksum = new Fletcher32(); - - client.ConnectAPM(clientEndpoint, () => - { - Action sendHandler = null; - var random = new Random(); - var remaining = BytesToSend; - bool first = true; - - if (!useMultipleBuffers) - { - var sendBuffer = new byte[512]; - sendHandler = sent => - { - if (!first) + var random = new Random(); + var sendBuffer = new byte[512]; + for (int remaining = BytesToSend, sent = 0; remaining > 0; remaining -= sent) { - bytesSent += sent; - sentChecksum.Add(sendBuffer, 0, sent); + random.NextBytes(sendBuffer); - remaining -= sent; - if (remaining <= 0) + if (pollBeforeOperation) { - client.LingerState = new LingerOption(true, LingerTime); - client.Dispose(); - return; + Assert.True(client.Poll(-1, SelectMode.SelectWrite), "Write poll should have succeeded"); } - } - else - { - first = false; - } - - random.NextBytes(sendBuffer); - client.SendAPM(sendBuffer, 0, Math.Min(sendBuffer.Length, remaining), SocketFlags.None, sendHandler); - }; - } - else - { - var sendBuffers = new List> { - new ArraySegment(new byte[23]), - new ArraySegment(new byte[256], 2, 100), - new ArraySegment(new byte[1], 0, 0), - new ArraySegment(new byte[64], 9, 9) - }; + sent = client.Send(sendBuffer, 0, Math.Min(sendBuffer.Length, remaining), SocketFlags.None); - sendHandler = sent => - { - if (!first) - { bytesSent += sent; - for (int i = 0, r = sent; i < sendBuffers.Count && r > 0; i++) - { - ArraySegment buffer = sendBuffers[i]; - int toAdd = Math.Min(buffer.Count, r); - sentChecksum.Add(buffer.Array, buffer.Offset, toAdd); - r -= toAdd; - } - - remaining -= sent; - if (remaining <= 0) - { - client.LingerState = new LingerOption(true, LingerTime); - client.Dispose(); - return; - } - } - else - { - first = false; - } - - for (int i = 0; i < sendBuffers.Count; i++) - { - random.NextBytes(sendBuffers[i].Array); + sentChecksum.Add(sendBuffer, 0, sent); } - client.SendAPM(sendBuffers, SocketFlags.None, sendHandler); - }; - } - - sendHandler(0); - }); + } + }); - Assert.True(serverFinished.Task.Wait(TestTimeout), "Completed within allowed time"); + Assert.True(Task.WaitAll(new[] { serverTask, clientTask }, TestTimeout), "Wait timed out"); - Assert.Equal(bytesSent, bytesReceived); - Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); + Assert.Equal(bytesSent, bytesReceived); + Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); + } + finally + { + listener.Stop(); + } } - [OuterLoop] // TODO: Issue #11345 - [Theory] - [MemberData(nameof(LoopbacksToSameLoopback))] - public void SendToRecvFromAsync_Datagram_UDP(IPAddress leftAddress, IPAddress rightAddress) + [ActiveIssue(13778, TestPlatforms.OSX)] + [Fact] + public async Task SendRecv_0ByteReceive_Success() { - // TODO #5185: harden against packet loss - const int DatagramSize = 256; - const int DatagramsToSend = 256; - const int AckTimeout = 1000; - const int TestTimeout = 30000; - - var left = new Socket(leftAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - var leftEventArgs = new SocketAsyncEventArgs(); - left.BindToAnonymousPort(leftAddress); - - var right = new Socket(rightAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - var rightEventArgs = new SocketAsyncEventArgs(); - right.BindToAnonymousPort(rightAddress); - - var leftEndpoint = (IPEndPoint)left.LocalEndPoint; - var rightEndpoint = (IPEndPoint)right.LocalEndPoint; - - var receiverAck = new ManualResetEventSlim(); - var senderAck = new ManualResetEventSlim(); - - EndPoint receiveRemote = leftEndpoint.Create(leftEndpoint.Serialize()); - var receiverFinished = new TaskCompletionSource(); - var receivedChecksums = new uint?[DatagramsToSend]; - var receiveBuffer = new byte[DatagramSize]; - int receivedDatagrams = -1; - - Action receiveHandler = null; - receiveHandler = (received, remote) => + using (Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + using (Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { - try - { - if (receivedDatagrams != -1) - { - Assert.Equal(DatagramSize, received); - Assert.Equal(rightEndpoint, remote); - - int datagramId = (int)receiveBuffer[0]; - Assert.Null(receivedChecksums[datagramId]); - - receivedChecksums[datagramId] = Fletcher32.Checksum(receiveBuffer, 0, received); - - receiverAck.Set(); - Assert.True(senderAck.Wait(AckTimeout)); - senderAck.Reset(); - - receivedDatagrams++; - if (receivedDatagrams == DatagramsToSend) - { - left.Dispose(); - receiverFinished.SetResult(true); - return; - } - } - else - { - receivedDatagrams = 0; - } - - left.ReceiveFromAsync(leftEventArgs, receiveBuffer, 0, receiveBuffer.Length, SocketFlags.None, receiveRemote, receiveHandler); - } - catch (Exception ex) - { - receiverFinished.SetException(ex); - } - }; - - receiveHandler(0, null); + listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + listener.Listen(1); - var random = new Random(); - var senderFinished = new TaskCompletionSource(); - var sentChecksums = new uint[DatagramsToSend]; - var sendBuffer = new byte[DatagramSize]; - int sentDatagrams = -1; + Task acceptTask = AcceptAsync(listener); + await Task.WhenAll( + acceptTask, + ConnectAsync(client, new IPEndPoint(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndPoint).Port))); - Action sendHandler = null; - sendHandler = sent => - { - try + using (Socket server = await acceptTask) { - if (sentDatagrams != -1) - { - Assert.True(receiverAck.Wait(AckTimeout)); - receiverAck.Reset(); - senderAck.Set(); - - Assert.Equal(DatagramSize, sent); - sentChecksums[sentDatagrams] = Fletcher32.Checksum(sendBuffer, 0, sent); - - sentDatagrams++; - if (sentDatagrams == DatagramsToSend) - { - right.Dispose(); - senderFinished.SetResult(true); - return; - } - } - else + for (int i = 0; i < 3; i++) { - sentDatagrams = 0; - } - - random.NextBytes(sendBuffer); - sendBuffer[0] = (byte)sentDatagrams; - right.SendToAsync(rightEventArgs, sendBuffer, 0, sendBuffer.Length, SocketFlags.None, leftEndpoint, sendHandler); - } - catch (Exception ex) - { - senderFinished.SetException(ex); - } - }; + // Have the client do a 0-byte receive. No data is available, so this should pend. + Task receive = ReceiveAsync(client, new ArraySegment(Array.Empty())); + Assert.False(receive.IsCompleted); + Assert.Equal(0, client.Available); - sendHandler(0); + // Have the server send 1 byte to the client. + Assert.Equal(1, server.Send(new byte[1], 0, 1, SocketFlags.None)); - Assert.True(receiverFinished.Task.Wait(TestTimeout)); - Assert.True(senderFinished.Task.Wait(TestTimeout)); + // The client should now wake up, getting 0 bytes with 1 byte available. + Assert.Equal(0, await receive); + Assert.Equal(1, client.Available); // Due to #13778, this sometimes fails on macOS - for (int i = 0; i < DatagramsToSend; i++) - { - Assert.NotNull(receivedChecksums[i]); - Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]); + // Receive that byte + Assert.Equal(1, await ReceiveAsync(client, new ArraySegment(new byte[1]))); + Assert.Equal(0, client.Available); + } + } } } + } + public sealed class SendReceiveUdpClient : MemberDatas + { [OuterLoop] // TODO: Issue #11345 [Theory] - [MemberData(nameof(LoopbacksToSameLoopback))] - public void SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress leftAddress, IPAddress rightAddress) + [MemberData(nameof(Loopbacks))] + public void SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress loopbackAddress) { + IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress; + // TODO #5185: harden against packet loss const int DatagramSize = 256; const int DatagramsToSend = 256; @@ -743,195 +425,10 @@ public void SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress leftAddress, IP } } } + } - [OuterLoop] // TODO: Issue #11345 - [Theory] - [MemberData(nameof(LoopbacksAndBuffers))] - public void SendRecvAsync_Stream_TCP(IPAddress listenAt, bool useMultipleBuffers) - { - const int BytesToSend = 123456; - const int ListenBacklog = 1; - const int LingerTime = 60; - const int TestTimeout = 30000; - - var server = new Socket(listenAt.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - server.BindToAnonymousPort(listenAt); - - server.Listen(ListenBacklog); - - var serverFinished = new TaskCompletionSource(); - int bytesReceived = 0; - var receivedChecksum = new Fletcher32(); - - var serverEventArgs = new SocketAsyncEventArgs(); - server.AcceptAsync(serverEventArgs, remote => - { - Action recvHandler = null; - bool first = true; - - if (!useMultipleBuffers) - { - var recvBuffer = new byte[256]; - recvHandler = received => - { - if (!first) - { - if (received == 0) - { - remote.Dispose(); - server.Dispose(); - serverFinished.SetResult(true); - return; - } - - bytesReceived += received; - receivedChecksum.Add(recvBuffer, 0, received); - } - else - { - first = false; - } - - remote.ReceiveAsync(serverEventArgs, recvBuffer, 0, recvBuffer.Length, SocketFlags.None, recvHandler); - }; - } - else - { - var recvBuffers = new List> { - new ArraySegment(new byte[123]), - new ArraySegment(new byte[256], 2, 100), - new ArraySegment(new byte[1], 0, 0), - new ArraySegment(new byte[64], 9, 33) - }; - - recvHandler = received => - { - if (!first) - { - if (received == 0) - { - remote.Dispose(); - server.Dispose(); - serverFinished.SetResult(true); - return; - } - - bytesReceived += received; - for (int i = 0, remaining = received; i < recvBuffers.Count && remaining > 0; i++) - { - ArraySegment buffer = recvBuffers[i]; - int toAdd = Math.Min(buffer.Count, remaining); - receivedChecksum.Add(buffer.Array, buffer.Offset, toAdd); - remaining -= toAdd; - } - } - else - { - first = false; - } - - remote.ReceiveAsync(serverEventArgs, recvBuffers, SocketFlags.None, recvHandler); - }; - } - - recvHandler(0); - }); - - EndPoint clientEndpoint = server.LocalEndPoint; - var client = new Socket(clientEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - - int bytesSent = 0; - var sentChecksum = new Fletcher32(); - - var clientEventArgs = new SocketAsyncEventArgs(); - client.ConnectAsync(clientEventArgs, clientEndpoint, () => - { - Action sendHandler = null; - var random = new Random(); - var remaining = BytesToSend; - bool first = true; - - if (!useMultipleBuffers) - { - var sendBuffer = new byte[512]; - sendHandler = sent => - { - if (!first) - { - bytesSent += sent; - sentChecksum.Add(sendBuffer, 0, sent); - - remaining -= sent; - Assert.True(remaining >= 0); - if (remaining == 0) - { - client.LingerState = new LingerOption(true, LingerTime); - client.Dispose(); - return; - } - } - else - { - first = false; - } - - random.NextBytes(sendBuffer); - client.SendAsync(clientEventArgs, sendBuffer, 0, Math.Min(sendBuffer.Length, remaining), SocketFlags.None, sendHandler); - }; - } - else - { - var sendBuffers = new List> { - new ArraySegment(new byte[23]), - new ArraySegment(new byte[256], 2, 100), - new ArraySegment(new byte[1], 0, 0), - new ArraySegment(new byte[64], 9, 9) - }; - - sendHandler = sent => - { - if (!first) - { - bytesSent += sent; - for (int i = 0, r = sent; i < sendBuffers.Count && r > 0; i++) - { - ArraySegment buffer = sendBuffers[i]; - int toAdd = Math.Min(buffer.Count, r); - sentChecksum.Add(buffer.Array, buffer.Offset, toAdd); - r -= toAdd; - } - - remaining -= sent; - if (remaining <= 0) - { - client.LingerState = new LingerOption(true, LingerTime); - client.Dispose(); - return; - } - } - else - { - first = false; - } - - for (int i = 0; i < sendBuffers.Count; i++) - { - random.NextBytes(sendBuffers[i].Array); - } - - client.SendAsync(clientEventArgs, sendBuffers, SocketFlags.None, sendHandler); - }; - } - - sendHandler(0); - }); - - Assert.True(serverFinished.Task.Wait(TestTimeout), "Completed within allowed time"); - - Assert.Equal(bytesSent, bytesReceived); - Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); - } - + public sealed class SendReceiveListener : MemberDatas + { [OuterLoop] // TODO: Issue #11345 [Theory] [MemberData(nameof(Loopbacks))] @@ -1002,138 +499,172 @@ public void SendRecvAsync_TcpListener_TcpClient(IPAddress listenAt) Assert.Equal(bytesSent, bytesReceived); Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); } + } - [OuterLoop] // TODO: Issue #11345 - [Theory] - [MemberData(nameof(LoopbacksAndBuffers))] - public void SendRecvPollSync_TcpListener_Socket(IPAddress listenAt, bool pollBeforeOperation) - { - const int BytesToSend = 123456; - const int ListenBacklog = 1; - const int TestTimeout = 30000; - - var listener = new TcpListener(listenAt, 0); - listener.Start(ListenBacklog); - try + public sealed class SendReceiveSync : SendReceive + { + public SendReceiveSync(ITestOutputHelper output) : base(output) { } + public override Task AcceptAsync(Socket s) => + Task.Run(() => s.Accept()); + public override Task ConnectAsync(Socket s, EndPoint endPoint) => + Task.Run(() => s.Connect(endPoint)); + public override Task ReceiveAsync(Socket s, ArraySegment buffer) => + Task.Run(() => s.Receive(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None)); + public override Task ReceiveAsync(Socket s, IList> bufferList) => + Task.Run(() => s.Receive(bufferList, SocketFlags.None)); + public override Task ReceiveFromAsync(Socket s, ArraySegment buffer, EndPoint endPoint) => + Task.Run(() => { - int bytesReceived = 0; - var receivedChecksum = new Fletcher32(); - Task serverTask = Task.Run(async () => + int received = s.ReceiveFrom(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None, ref endPoint); + return new SocketReceiveFromResult { - using (Socket remote = await listener.AcceptSocketAsync()) - { - var recvBuffer = new byte[256]; - while (true) - { - if (pollBeforeOperation) - { - Assert.True(remote.Poll(-1, SelectMode.SelectRead), "Read poll before completion should have succeeded"); - } - int received = remote.Receive(recvBuffer, 0, recvBuffer.Length, SocketFlags.None); - if (received == 0) - { - Assert.True(remote.Poll(0, SelectMode.SelectRead), "Read poll after completion should have succeeded"); - break; - } - - bytesReceived += received; - receivedChecksum.Add(recvBuffer, 0, received); - } - } - }); + ReceivedBytes = received, + RemoteEndPoint = endPoint + }; + }); + public override Task SendAsync(Socket s, ArraySegment buffer) => + Task.Run(() => s.Send(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None)); + public override Task SendAsync(Socket s, IList> bufferList) => + Task.Run(() => s.Send(bufferList, SocketFlags.None)); + public override Task SendToAsync(Socket s, ArraySegment buffer, EndPoint endPoint) => + Task.Run(() => s.SendTo(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None, endPoint)); + } - int bytesSent = 0; - var sentChecksum = new Fletcher32(); - Task clientTask = Task.Run(async () => + public sealed class SendReceiveApm : SendReceive + { + public SendReceiveApm(ITestOutputHelper output) : base(output) { } + public override Task AcceptAsync(Socket s) => + Task.Factory.FromAsync(s.BeginAccept, s.EndAccept, null); + public override Task ConnectAsync(Socket s, EndPoint endPoint) => + Task.Factory.FromAsync(s.BeginConnect, s.EndConnect, endPoint, null); + public override Task ReceiveAsync(Socket s, ArraySegment buffer) => + Task.Factory.FromAsync((callback, state) => + s.BeginReceive(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None, callback, state), + s.EndReceive, null); + public override Task ReceiveAsync(Socket s, IList> bufferList) => + Task.Factory.FromAsync(s.BeginReceive, s.EndReceive, bufferList, SocketFlags.None, null); + public override Task ReceiveFromAsync(Socket s, ArraySegment buffer, EndPoint endPoint) + { + var tcs = new TaskCompletionSource(); + s.BeginReceiveFrom(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None, ref endPoint, iar => + { + try { - var clientEndpoint = (IPEndPoint)listener.LocalEndpoint; - - using (var client = new Socket(clientEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)) + int receivedBytes = s.EndReceiveFrom(iar, ref endPoint); + tcs.TrySetResult(new SocketReceiveFromResult { - await client.ConnectAsync(clientEndpoint.Address, clientEndpoint.Port); - - if (pollBeforeOperation) - { - Assert.False(client.Poll(TestTimeout, SelectMode.SelectRead), "Expected writer's read poll to fail after timeout"); - } - - var random = new Random(); - var sendBuffer = new byte[512]; - for (int remaining = BytesToSend, sent = 0; remaining > 0; remaining -= sent) - { - random.NextBytes(sendBuffer); - - if (pollBeforeOperation) - { - Assert.True(client.Poll(-1, SelectMode.SelectWrite), "Write poll should have succeeded"); - } - sent = client.Send(sendBuffer, 0, Math.Min(sendBuffer.Length, remaining), SocketFlags.None); - - bytesSent += sent; - sentChecksum.Add(sendBuffer, 0, sent); - } - } - }); + ReceivedBytes = receivedBytes, + RemoteEndPoint = endPoint + }); + } + catch (Exception e) { tcs.TrySetException(e); } + }, null); + return tcs.Task; + } + public override Task SendAsync(Socket s, ArraySegment buffer) => + Task.Factory.FromAsync((callback, state) => + s.BeginSend(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None, callback, state), + s.EndSend, null); + public override Task SendAsync(Socket s, IList> bufferList) => + Task.Factory.FromAsync(s.BeginSend, s.EndSend, bufferList, SocketFlags.None, null); + public override Task SendToAsync(Socket s, ArraySegment buffer, EndPoint endPoint) => + Task.Factory.FromAsync( + (callback, state) => s.BeginSendTo(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None, endPoint, callback, state), + s.EndSendTo, null); + } - Assert.True(Task.WaitAll(new[] { serverTask, clientTask }, TestTimeout), "Wait timed out"); + public sealed class SendReceiveTask : SendReceive + { + public SendReceiveTask(ITestOutputHelper output) : base(output) { } + public override Task AcceptAsync(Socket s) => + s.AcceptAsync(); + public override Task ConnectAsync(Socket s, EndPoint endPoint) => + s.ConnectAsync(endPoint); + public override Task ReceiveAsync(Socket s, ArraySegment buffer) => + s.ReceiveAsync(buffer, SocketFlags.None); + public override Task ReceiveAsync(Socket s, IList> bufferList) => + s.ReceiveAsync(bufferList, SocketFlags.None); + public override Task ReceiveFromAsync(Socket s, ArraySegment buffer, EndPoint endPoint) => + s.ReceiveFromAsync(buffer, SocketFlags.None, endPoint); + public override Task SendAsync(Socket s, ArraySegment buffer) => + s.SendAsync(buffer, SocketFlags.None); + public override Task SendAsync(Socket s, IList> bufferList) => + s.SendAsync(bufferList, SocketFlags.None); + public override Task SendToAsync(Socket s, ArraySegment buffer, EndPoint endPoint) => + s.SendToAsync(buffer, SocketFlags.None, endPoint); + } - Assert.Equal(bytesSent, bytesReceived); - Assert.Equal(sentChecksum.Sum, receivedChecksum.Sum); - } - finally + public sealed class SendReceiveEap : SendReceive + { + public SendReceiveEap(ITestOutputHelper output) : base(output) { } + public override Task AcceptAsync(Socket s) => + InvokeAsync(s, e => e.AcceptSocket, e => s.AcceptAsync(e)); + public override Task ConnectAsync(Socket s, EndPoint endPoint) => + InvokeAsync(s, e => true, e => { - listener.Stop(); - } - } + e.RemoteEndPoint = endPoint; + return s.ConnectAsync(e); + }); + public override Task ReceiveAsync(Socket s, ArraySegment buffer) => + InvokeAsync(s, e => e.BytesTransferred, e => + { + e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + return s.ReceiveAsync(e); + }); + public override Task ReceiveAsync(Socket s, IList> bufferList) => + InvokeAsync(s, e => e.BytesTransferred, e => + { + e.BufferList = bufferList; + return s.ReceiveAsync(e); + }); + public override Task ReceiveFromAsync(Socket s, ArraySegment buffer, EndPoint endPoint) => + InvokeAsync(s, e => new SocketReceiveFromResult { ReceivedBytes = e.BytesTransferred, RemoteEndPoint = e.RemoteEndPoint }, e => + { + e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + e.RemoteEndPoint = endPoint; + return s.ReceiveFromAsync(e); + }); + public override Task SendAsync(Socket s, ArraySegment buffer) => + InvokeAsync(s, e => e.BytesTransferred, e => + { + e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + return s.SendAsync(e); + }); + public override Task SendAsync(Socket s, IList> bufferList) => + InvokeAsync(s, e => e.BytesTransferred, e => + { + e.BufferList = bufferList; + return s.SendAsync(e); + }); + public override Task SendToAsync(Socket s, ArraySegment buffer, EndPoint endPoint) => + InvokeAsync(s, e => e.BytesTransferred, e => + { + e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + e.RemoteEndPoint = endPoint; + return s.SendToAsync(e); + }); - [ActiveIssue(13778, TestPlatforms.OSX)] - [Fact] - public static async Task SendRecvAsync_0ByteReceive_Success() + private static Task InvokeAsync( + Socket s, + Func getResult, + Func invoke) { - using (Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) - using (Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + var tcs = new TaskCompletionSource(); + var saea = new SocketAsyncEventArgs(); + EventHandler handler = (_, e) => { - listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); - listener.Listen(1); - - Task acceptTask = listener.AcceptAsync(); - await Task.WhenAll( - acceptTask, - client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndPoint).Port))); - - using (Socket server = await acceptTask) - { - TaskCompletionSource tcs = null; - - var ea = new SocketAsyncEventArgs(); - ea.SetBuffer(Array.Empty(), 0, 0); - ea.Completed += delegate { tcs.SetResult(true); }; - - for (int i = 0; i < 3; i++) - { - tcs = new TaskCompletionSource(); - - // Have the client do a 0-byte receive. No data is available, so this should pend. - Assert.True(client.ReceiveAsync(ea)); - Assert.Equal(0, client.Available); - - // Have the server send 1 byte to the client. - Assert.Equal(1, server.Send(new byte[1], 0, 1, SocketFlags.None)); - - // The client should now wake up, getting 0 bytes with 1 byte available. - await tcs.Task; - Assert.Equal(0, ea.BytesTransferred); - Assert.Equal(SocketError.Success, ea.SocketError); - Assert.Equal(1, client.Available); // Due to #13778, this sometimes fails on macOS - - // Receive that byte - Assert.Equal(1, client.Receive(new byte[1])); - Assert.Equal(0, client.Available); - } - } - } + if (e.SocketError == SocketError.Success) tcs.SetResult(getResult(e)); + else tcs.SetException(new SocketException((int)e.SocketError)); + saea.Dispose(); + }; + saea.Completed += handler; + if (!invoke(saea)) handler(s, saea); + return tcs.Task; } + } + public abstract class MemberDatas + { public static readonly object[][] Loopbacks = new[] { new object[] { IPAddress.Loopback }, @@ -1147,11 +678,5 @@ await Task.WhenAll( new object[] { IPAddress.Loopback, true }, new object[] { IPAddress.Loopback, false }, }; - - public static readonly object[][] LoopbacksToSameLoopback = new object[][] - { - new object[] { IPAddress.IPv6Loopback, IPAddress.IPv6Loopback }, - new object[] { IPAddress.Loopback, IPAddress.Loopback } - }; } }