diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs index a14db1f189..7e8422f172 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs @@ -7,9 +7,9 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities using System.IO; using System.Text; using System.Threading.Tasks; - using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces; using Microsoft.VisualStudio.TestPlatform.ObjectModel; + using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions; using Microsoft.VisualStudio.TestPlatform.Utilities; /// @@ -17,17 +17,16 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities /// public class LengthPrefixCommunicationChannel : ICommunicationChannel { - private readonly Stream stream; - private readonly BinaryReader reader; private readonly BinaryWriter writer; public LengthPrefixCommunicationChannel(Stream stream) { - this.stream = stream; this.reader = new BinaryReader(stream, Encoding.UTF8, true); - this.writer = new BinaryWriter(stream, Encoding.UTF8, true); + + // Using the Buffered stream while writing, improves the write performance. By reducing the number of writes. + this.writer = new BinaryWriter(new PlatformStream().CreateBufferedStream(stream, SocketConstants.BufferSize), Encoding.UTF8, true); } /// diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs index ffb1ac0a01..587096c2d6 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs @@ -11,6 +11,7 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities using System.Threading.Tasks; using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces; using Microsoft.VisualStudio.TestPlatform.ObjectModel; + using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions; using Microsoft.VisualStudio.TestPlatform.Utilities; /// @@ -35,7 +36,7 @@ protected SocketClient(Func channelFactory) this.cancellation = new CancellationTokenSource(); this.stopped = false; - this.tcpClient = new TcpClient(); + this.tcpClient = new TcpClient { NoDelay = true }; this.channelFactory = channelFactory; } diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketCommunicationManager.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketCommunicationManager.cs index aaa1c7101c..1d7fe6efe6 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketCommunicationManager.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketCommunicationManager.cs @@ -10,9 +10,9 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; - using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces; using Microsoft.VisualStudio.TestPlatform.ObjectModel; + using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions; /// /// Facilitates communication using sockets @@ -70,11 +70,6 @@ public class SocketCommunicationManager : ICommunicationManager /// private object sendSyncObject = new object(); - /// - /// Stream to use read timeout - /// - private NetworkStream stream; - private Socket socket; /// @@ -100,7 +95,6 @@ internal SocketCommunicationManager(IDataSerializer dataSerializer) public IPEndPoint HostServer(IPEndPoint endpoint) { this.tcpListener = new TcpListener(endpoint); - this.tcpListener.Start(); EqtTrace.Info("Listening on Endpoint : {0}", (IPEndPoint)this.tcpListener.LocalEndpoint); @@ -119,13 +113,20 @@ public async Task AcceptClientAsync() var client = await this.tcpListener.AcceptTcpClientAsync(); this.socket = client.Client; - this.stream = client.GetStream(); - this.binaryReader = new BinaryReader(this.stream); - this.binaryWriter = new BinaryWriter(this.stream); + this.socket.NoDelay = true; - this.clientConnectedEvent.Set(); + // Using Buffered stream only in case of write, and Network stream in case of read. + var bufferedStream = new PlatformStream().CreateBufferedStream(client.GetStream(), SocketConstants.BufferSize); + var networkStream = client.GetStream(); + this.binaryReader = new BinaryReader(networkStream); + this.binaryWriter = new BinaryWriter(bufferedStream); - EqtTrace.Info("Accepted Client request and set the flag"); + this.clientConnectedEvent.Set(); + if (EqtTrace.IsInfoEnabled) + { + EqtTrace.Info("Using the buffer size of {0} bytes", SocketConstants.BufferSize); + EqtTrace.Info("Accepted Client request and set the flag"); + } } } @@ -165,7 +166,7 @@ public async Task SetupClientAsync(IPEndPoint endpoint) // for now added a check for validation of this.tcpclient this.clientConnectionAcceptedEvent.Reset(); EqtTrace.Info("Trying to connect to server on socket : {0} ", endpoint); - this.tcpClient = new TcpClient(); + this.tcpClient = new TcpClient { NoDelay = true }; this.socket = this.tcpClient.Client; Stopwatch watch = new Stopwatch(); @@ -178,10 +179,18 @@ public async Task SetupClientAsync(IPEndPoint endpoint) if (this.tcpClient.Connected) { - this.stream = this.tcpClient.GetStream(); - this.binaryReader = new BinaryReader(this.stream); - this.binaryWriter = new BinaryWriter(this.stream); - EqtTrace.Info("Connected to the server successfully "); + // Using Buffered stream only in case of write, and Network stream in case of read. + var bufferedStream = new PlatformStream().CreateBufferedStream(this.tcpClient.GetStream(), SocketConstants.BufferSize); + var networkStream = this.tcpClient.GetStream(); + this.binaryReader = new BinaryReader(networkStream); + this.binaryWriter = new BinaryWriter(bufferedStream); + + if (EqtTrace.IsInfoEnabled) + { + EqtTrace.Info("Connected to the server successfully "); + EqtTrace.Info("Using the buffer size of {0} bytes", SocketConstants.BufferSize); + } + this.clientConnectionAcceptedEvent.Set(); } } diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketConstants.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketConstants.cs new file mode 100644 index 0000000000..4279445ee9 --- /dev/null +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketConstants.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities +{ + public class SocketConstants + { + // Buffer size for the buffered stream we are using. + public const int BufferSize = 16384; + } +} diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs index 6fa970136b..a514048a1b 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs @@ -9,9 +9,9 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; - using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces; using Microsoft.VisualStudio.TestPlatform.ObjectModel; + using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions; using Microsoft.VisualStudio.TestPlatform.Utilities; /// @@ -88,10 +88,11 @@ public void Stop() private void OnClientConnected(TcpClient client) { this.tcpClient = client; + this.tcpClient.Client.NoDelay = true; if (this.ClientConnected != null) { - this.channel = this.channelFactory(client.GetStream()); + this.channel = this.channelFactory(this.tcpClient.GetStream()); this.ClientConnected.SafeInvoke(this, new ConnectedEventArgs(this.channel), "SocketServer: ClientConnected"); // Start the message loop diff --git a/src/Microsoft.TestPlatform.PlatformAbstractions/Interfaces/IO/IStream.cs b/src/Microsoft.TestPlatform.PlatformAbstractions/Interfaces/IO/IStream.cs new file mode 100644 index 0000000000..127283bdcc --- /dev/null +++ b/src/Microsoft.TestPlatform.PlatformAbstractions/Interfaces/IO/IStream.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces +{ + using System.IO; + + /// + /// Helper class to return plaform specific stream. + /// + public interface IStream + { + /// + /// Returns platrform specific Buffered Stream with desired buffer size. + /// + /// Input Stream + /// Buffer Size + /// Buffered Stream + Stream CreateBufferedStream(Stream stream, int bufferSize); + } +} \ No newline at end of file diff --git a/src/Microsoft.TestPlatform.PlatformAbstractions/common/IO/PlatformStream.cs b/src/Microsoft.TestPlatform.PlatformAbstractions/common/IO/PlatformStream.cs new file mode 100644 index 0000000000..fddfab4524 --- /dev/null +++ b/src/Microsoft.TestPlatform.PlatformAbstractions/common/IO/PlatformStream.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions +{ + using System.IO; + using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces; + + /// + public class PlatformStream : IStream + { + /// + public Stream CreateBufferedStream(Stream stream, int bufferSize) + { + return new BufferedStream(stream, bufferSize); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.TestPlatform.PlatformAbstractions/netstandard1.0/IO/PlatformStream.cs b/src/Microsoft.TestPlatform.PlatformAbstractions/netstandard1.0/IO/PlatformStream.cs new file mode 100644 index 0000000000..cfcd7260f3 --- /dev/null +++ b/src/Microsoft.TestPlatform.PlatformAbstractions/netstandard1.0/IO/PlatformStream.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions +{ + using System; + using System.IO; + using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces; + + /// + public class PlatformStream : IStream + { + /// + public Stream CreateBufferedStream(Stream stream, int bufferSize) + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.TestPlatform.PlatformAbstractions/uap10.0/IO/PlatformStream.cs b/src/Microsoft.TestPlatform.PlatformAbstractions/uap10.0/IO/PlatformStream.cs new file mode 100644 index 0000000000..0f1f343949 --- /dev/null +++ b/src/Microsoft.TestPlatform.PlatformAbstractions/uap10.0/IO/PlatformStream.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions +{ + using System.IO; + using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces; + + /// + public class PlatformStream : IStream + { + /// + public Stream CreateBufferedStream(Stream stream, int bufferSize) + { + return stream; + } + } +} \ No newline at end of file diff --git a/test/Microsoft.TestPlatform.CommunicationUtilities.PlatformTests/SocketCommunicationManagerTests.cs b/test/Microsoft.TestPlatform.CommunicationUtilities.PlatformTests/SocketCommunicationManagerTests.cs index 80897ed9ae..b070989b1b 100644 --- a/test/Microsoft.TestPlatform.CommunicationUtilities.PlatformTests/SocketCommunicationManagerTests.cs +++ b/test/Microsoft.TestPlatform.CommunicationUtilities.PlatformTests/SocketCommunicationManagerTests.cs @@ -4,14 +4,15 @@ namespace Microsoft.TestPlatform.CommunicationUtilities.PlatformTests { using System; + using System.Diagnostics; using System.IO; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; - using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities; + using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces; using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.ObjectModel; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -264,9 +265,55 @@ public async Task ReceiveRawMessageAsyncShouldNotDeserializeThePayload() Assert.AreEqual(DummyPayload, message); } - #endregion + [TestMethod] + public void SocketPollShouldNotHangServerClientCommunication() + { + // Measure the throughput with socket communication v1 (SocketCommunicationManager) + // implementation. + var server = new SocketCommunicationManager(); + var client = new SocketCommunicationManager(); + + int port = server.HostServer(new IPEndPoint(IPAddress.Loopback, 0)).Port; + client.SetupClientAsync(new IPEndPoint(IPAddress.Loopback, port)).Wait(); + server.AcceptClientAsync().Wait(); + + server.WaitForClientConnection(1000); + client.WaitForServerConnection(1000); + + var clientThread = new Thread(() => SendData(client)); + clientThread.Start(); + + var dataReceived = 0; + while (dataReceived < 2048 * 5) + { + dataReceived += server.ReceiveRawMessageAsync(CancellationToken.None).Result.Length; + Task.Delay(1000).Wait(); + } + + clientThread.Join(); + + Assert.IsTrue(true); + } + + private static void SendData(ICommunicationManager communicationManager) + { + // Having less than the buffer size in SocketConstants.BUFFERSIZE. + var dataBytes = new byte[2048]; + for (int i = 0; i < dataBytes.Length; i++) + { + dataBytes[i] = 0x65; + } + + var dataBytesStr = Encoding.UTF8.GetString(dataBytes); + + for (int i = 0; i < 5; i++) + { + communicationManager.SendRawMessage(dataBytesStr); + } + } + private int StartServer() { this.tcpListener.Start();