From d872914bc56bdeb89102148e2fe950763a66cf16 Mon Sep 17 00:00:00 2001 From: Satya Madala Date: Mon, 26 Mar 2018 20:01:01 +0530 Subject: [PATCH] Fix socket exception on datacollection in parallel (#1505) * Add more logging for datacollection manager * Skip additional messages * Add more logging * Add logging for MessageLoopAsync * Add logging for MessageLoopAsync 2 * Stop the communication server on operation complete * Add tests and remove not required logging * Address review comments --- .../LengthPrefixCommunicationChannel.cs | 3 +- .../SocketClient.cs | 13 +++++--- .../SocketServer.cs | 17 ++++++---- .../TcpClientExtensions.cs | 25 ++++++++++---- .../TestRequestSender.cs | 1 + .../ProxyDataCollectionManager.cs | 21 ++++++++++-- .../EventHandlers/TestRequestHandler.cs | 19 ++++++----- .../TestRequestSenderTests.cs | 33 +++++++++++++++++++ 8 files changed, 104 insertions(+), 28 deletions(-) diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs index c09ad20eb6..fc5923787d 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/LengthPrefixCommunicationChannel.cs @@ -53,7 +53,7 @@ public Task Send(string data) } catch (Exception ex) { - EqtTrace.Verbose("LengthPrefixCommunicationChannel: Error sending data: {0}.", ex); + EqtTrace.Error("LengthPrefixCommunicationChannel.Send: Error sending data: {0}.", ex); throw new CommunicationException("Unable to send data over channel.", ex); } @@ -78,6 +78,7 @@ public Task NotifyDataAvailable() /// public void Dispose() { + EqtTrace.Verbose("LengthPrefixCommunicationChannel.Dispose: Dispose reader and writer."); this.reader.Dispose(); this.writer.Dispose(); } diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs index 1fc7739f72..4459dc8ec2 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs @@ -24,6 +24,7 @@ public class SocketClient : ICommunicationEndPoint private readonly Func channelFactory; private ICommunicationChannel channel; private bool stopped; + private string endPoint; public SocketClient() : this(stream => new LengthPrefixCommunicationChannel(stream)) @@ -49,12 +50,10 @@ protected SocketClient(Func channelFactory) /// public string Start(string endPoint) { + this.endPoint = endPoint; var ipEndPoint = endPoint.GetIPEndPoint(); - if (EqtTrace.IsVerboseEnabled) - { - EqtTrace.Verbose("Waiting for connecting to server"); - } + EqtTrace.Info("SocketClient.Start: connecting to server endpoint: {0}", endPoint); // Don't start if the endPoint port is zero this.tcpClient.ConnectAsync(ipEndPoint.Address, ipEndPoint.Port).ContinueWith(this.OnServerConnected); @@ -64,6 +63,8 @@ public string Start(string endPoint) /// public void Stop() { + EqtTrace.Info("SocketClient.Stop: Stop communication from server endpoint: {0}", this.endPoint); + if (!this.stopped) { EqtTrace.Info("SocketClient: Stop: Cancellation requested. Stopping message loop."); @@ -73,6 +74,8 @@ public void Stop() private void OnServerConnected(Task connectAsyncTask) { + EqtTrace.Info("SocketClient.OnServerConnected: connected to server endpoint: {0}", this.endPoint); + if (this.Connected != null) { if (connectAsyncTask.IsFaulted) @@ -105,6 +108,8 @@ private void OnServerConnected(Task connectAsyncTask) private void Stop(Exception error) { + EqtTrace.Info("SocketClient.PrivateStop: Stop communication from server endpoint: {0}, error:{1}", this.endPoint, error); + if (!this.stopped) { // Do not allow stop to be called multiple times. diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs index 8057e48af6..b2e6cd6649 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs @@ -11,7 +11,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities using System.Threading.Tasks; using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces; using Microsoft.VisualStudio.TestPlatform.ObjectModel; - using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions; using Microsoft.VisualStudio.TestPlatform.Utilities; /// @@ -31,6 +30,8 @@ public class SocketServer : ICommunicationEndPoint private bool stopped; + private string endPoint; + /// /// Initializes a new instance of the class. /// @@ -64,21 +65,22 @@ public string Start(string endPoint) this.tcpListener.Start(); - var connectionInfo = ((IPEndPoint)this.tcpListener.LocalEndpoint).ToString(); - EqtTrace.Info("SocketServer: Listening on end point : {0}", connectionInfo); + this.endPoint = ((IPEndPoint)this.tcpListener.LocalEndpoint).ToString(); + EqtTrace.Info("SocketServer.Start: Listening on endpoint : {0}", this.endPoint); // Serves a single client at the moment. An error in connection, or message loop just // terminates the entire server. this.tcpListener.AcceptTcpClientAsync().ContinueWith(t => this.OnClientConnected(t.Result)); - return connectionInfo; + return this.endPoint; } /// public void Stop() { + EqtTrace.Info("SocketServer.Stop: Stop server endPoint: {0}", this.endPoint); if (!this.stopped) { - EqtTrace.Info("SocketServer: Stop: Cancellation requested. Stopping message loop."); + EqtTrace.Info("SocketServer.Stop: Cancellation requested. Stopping message loop."); this.cancellation.Cancel(); } } @@ -95,7 +97,7 @@ private void OnClientConnected(TcpClient client) if (EqtTrace.IsVerboseEnabled) { - EqtTrace.Verbose("Client connected, and starting MessageLoopAsync"); + EqtTrace.Verbose("SocketServer.OnClientConnected: Client connected for endPoint: {0}, starting MessageLoopAsync:", this.endPoint); } // Start the message loop @@ -105,6 +107,8 @@ private void OnClientConnected(TcpClient client) private void Stop(Exception error) { + EqtTrace.Info("SocketServer.PrivateStop: Stopp server endPoint: {0} error: {1}", this.endPoint, error); + if (!this.stopped) { // Do not allow stop to be called multiple times. @@ -124,6 +128,7 @@ private void Stop(Exception error) this.channel.Dispose(); this.cancellation.Dispose(); + EqtTrace.Info("SocketServer.Stop: Raise disconnected event endPoint: {0} error: {1}", this.endPoint, error); this.Disconnected?.SafeInvoke(this, new DisconnectedEventArgs { Error = error }, "SocketServer: ClientDisconnected"); } } diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/TcpClientExtensions.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/TcpClientExtensions.cs index 5ccc88f2d7..f0e3e2ef80 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/TcpClientExtensions.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/TcpClientExtensions.cs @@ -25,14 +25,19 @@ internal static Task MessageLoopAsync( CancellationToken cancellationToken) { Exception error = null; + var remoteEndPoint = client.Client.RemoteEndPoint.ToString(); + var localEndPoint = client.Client.LocalEndPoint.ToString(); // Set read timeout to avoid blocking receive raw message while (channel != null && !cancellationToken.IsCancellationRequested) { + EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: Polling on remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint); + try { if (client.Client.Poll(STREAMREADTIMEOUT, SelectMode.SelectRead)) { + EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: NotifyDataAvailable remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint); channel.NotifyDataAvailable(); } } @@ -43,14 +48,18 @@ internal static Task MessageLoopAsync( && socketException.SocketErrorCode == SocketError.TimedOut) { EqtTrace.Info( - "Socket: Message loop: failed to receive message due to read timeout {0}", - ioException); + "Socket: Message loop: failed to receive message due to read timeout {0}, remoteEndPoint: {1} localEndPoint: {2}", + ioException, + remoteEndPoint, + localEndPoint); } else { EqtTrace.Error( - "Socket: Message loop: failed to receive message due to socket error {0}", - ioException); + "Socket: Message loop: failed to receive message due to socket error {0}, remoteEndPoint: {1} localEndPoint: {2}", + ioException, + remoteEndPoint, + localEndPoint); error = ioException; break; } @@ -58,8 +67,10 @@ internal static Task MessageLoopAsync( catch (Exception exception) { EqtTrace.Error( - "Socket: Message loop: failed to receive message {0}", - exception); + "Socket: Message loop: failed to receive message {0}, remoteEndPoint: {1} localEndPoint: {2}", + exception, + remoteEndPoint, + localEndPoint); error = exception; break; } @@ -68,6 +79,8 @@ internal static Task MessageLoopAsync( // Try clean up and raise client disconnected events errorHandler(error); + EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: exiting MessageLoopAsync remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint); + return Task.FromResult(0); } diff --git a/src/Microsoft.TestPlatform.CommunicationUtilities/TestRequestSender.cs b/src/Microsoft.TestPlatform.CommunicationUtilities/TestRequestSender.cs index bf570335fe..874bdd3b2e 100644 --- a/src/Microsoft.TestPlatform.CommunicationUtilities/TestRequestSender.cs +++ b/src/Microsoft.TestPlatform.CommunicationUtilities/TestRequestSender.cs @@ -596,6 +596,7 @@ private void SetOperationComplete() EqtTrace.Verbose("TestRequestSender.SetOperationComplete: Setting operation complete."); } + this.communicationEndpoint.Stop(); Interlocked.CompareExchange(ref this.operationCompleted, 1, 0); } diff --git a/src/Microsoft.TestPlatform.CrossPlatEngine/DataCollection/ProxyDataCollectionManager.cs b/src/Microsoft.TestPlatform.CrossPlatEngine/DataCollection/ProxyDataCollectionManager.cs index 89b59fde03..a6dbc7645d 100644 --- a/src/Microsoft.TestPlatform.CrossPlatEngine/DataCollection/ProxyDataCollectionManager.cs +++ b/src/Microsoft.TestPlatform.CrossPlatEngine/DataCollection/ProxyDataCollectionManager.cs @@ -45,6 +45,8 @@ internal class ProxyDataCollectionManager : IProxyDataCollectionManager private string settingsXml; private int connectionTimeout; private IRequestData requestData; + private int dataCollectionPort; + private int dataCollectionProcessId; /// /// Initializes a new instance of the class. @@ -126,6 +128,7 @@ public Collection AfterTestRunEnd(bool isCanceled, ITestMessageEv this.InvokeDataCollectionServiceAction( () => { + EqtTrace.Info("ProxyDataCollectionManager.AfterTestRunEnd: Get attachment set for datacollector processId: {0} port: {1}", dataCollectionProcessId, dataCollectionPort); attachmentSet = this.dataCollectionRequestSender.SendAfterTestRunStartAndGetResult(runEventsHandler, isCanceled); }, runEventsHandler); @@ -159,9 +162,15 @@ public DataCollectionParameters BeforeTestRunStart( this.InvokeDataCollectionServiceAction( () => { + EqtTrace.Info("ProxyDataCollectionManager.BeforeTestRunStart: Get env variable and port for datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort); var result = this.dataCollectionRequestSender.SendBeforeTestRunStartAndGetResult(this.settingsXml, runEventsHandler); environmentVariables = result.EnvironmentVariables; dataCollectionEventsPort = result.DataCollectionEventsPort; + + EqtTrace.Info( + "ProxyDataCollectionManager.BeforeTestRunStart: SendBeforeTestRunStartAndGetResult successful, env variable from datacollector: {0} and testhost port: {1}", + string.Join(";", environmentVariables), + dataCollectionEventsPort); }, runEventsHandler); return new DataCollectionParameters( @@ -175,21 +184,27 @@ public DataCollectionParameters BeforeTestRunStart( /// public void Dispose() { + EqtTrace.Info("ProxyDataCollectionManager.Dispose: calling dospose for datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort); this.dataCollectionRequestSender.Close(); } /// public void Initialize() { - var port = this.dataCollectionRequestSender.InitializeCommunication(); + this.dataCollectionPort = this.dataCollectionRequestSender.InitializeCommunication(); // Warn the user that execution will wait for debugger attach. - var processId = this.dataCollectionLauncher.LaunchDataCollector(null, this.GetCommandLineArguments(port)); + this.dataCollectionProcessId = this.dataCollectionLauncher.LaunchDataCollector(null, this.GetCommandLineArguments(this.dataCollectionPort)); + EqtTrace.Info("ProxyDataCollectionManager.Initialize: Launched datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort); + + ChangeConnectionTimeoutIfRequired(dataCollectionProcessId); + + EqtTrace.Info("ProxyDataCollectionManager.Initialize: waiting for connection with timeout: {0}", this.connectionTimeout); - ChangeConnectionTimeoutIfRequired(processId); var connected = this.dataCollectionRequestSender.WaitForRequestHandlerConnection(this.connectionTimeout); if (connected == false) { + EqtTrace.Error("ProxyDataCollectionManager.Initialize: failed to connect to datacollector process, processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort); throw new TestPlatformException(string.Format(CultureInfo.CurrentUICulture, CrossPlatEngineResources.FailedToConnectDataCollector)); } } diff --git a/src/Microsoft.TestPlatform.CrossPlatEngine/EventHandlers/TestRequestHandler.cs b/src/Microsoft.TestPlatform.CrossPlatEngine/EventHandlers/TestRequestHandler.cs index 144787bb24..cbe9a27f5a 100644 --- a/src/Microsoft.TestPlatform.CrossPlatEngine/EventHandlers/TestRequestHandler.cs +++ b/src/Microsoft.TestPlatform.CrossPlatEngine/EventHandlers/TestRequestHandler.cs @@ -10,7 +10,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.EventHandlers; using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces; using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.ObjectModel; - using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.DataCollection.Interfaces; using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.EventHandlers; using Microsoft.VisualStudio.TestPlatform.ObjectModel; using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client; @@ -20,7 +19,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities using Microsoft.VisualStudio.TestPlatform.ObjectModel.Utilities; using Microsoft.VisualStudio.TestPlatform.Utilities; using CrossPlatResources = Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Resources.Resources; - using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine; public class TestRequestHandler : IDisposable, ITestRequestHandler { @@ -131,14 +129,14 @@ public void Close() public void SendTestCases(IEnumerable discoveredTestCases) { var data = this.dataSerializer.SerializePayload(MessageType.TestCasesFound, discoveredTestCases, this.protocolVersion); - this.channel.Send(data); + this.SendData(data); } /// public void SendTestRunStatistics(TestRunChangedEventArgs testRunChangedArgs) { var data = this.dataSerializer.SerializePayload(MessageType.TestRunStatsChange, testRunChangedArgs, this.protocolVersion); - this.channel.Send(data); + this.SendData(data); } /// @@ -148,7 +146,7 @@ public void SendLog(TestMessageLevel messageLevel, string message) MessageType.TestMessage, new TestMessagePayload { MessageLevel = messageLevel, Message = message }, this.protocolVersion); - this.channel.Send(data); + this.SendData(data); } /// @@ -168,7 +166,7 @@ public void SendExecutionComplete( ExecutorUris = executorUris }, this.protocolVersion); - this.channel.Send(data); + this.SendData(data); } /// @@ -184,7 +182,7 @@ public void DiscoveryComplete(DiscoveryCompleteEventArgs discoveryCompleteEventA Metrics = discoveryCompleteEventArgs.Metrics }, this.protocolVersion); - this.channel.Send(data); + this.SendData(data); } /// @@ -201,7 +199,7 @@ public int LaunchProcessWithDebuggerAttached(TestProcessStartInfo testProcessSta var data = dataSerializer.SerializePayload(MessageType.LaunchAdapterProcessWithDebuggerAttached, testProcessStartInfo, protocolVersion); - this.channel.Send(data); + this.SendData(data); EqtTrace.Verbose("Waiting for LaunchAdapterProcessWithDebuggerAttached ack"); waitHandle.Wait(); @@ -389,5 +387,10 @@ private void SetCommunicationEndPoint() } } + private void SendData(string data) + { + EqtTrace.Verbose("TestRequestHandler.SendData: sending data from testhost: {0}", data); + this.channel.Send(data); + } } } diff --git a/test/Microsoft.TestPlatform.CommunicationUtilities.UnitTests/TestRequestSenderTests.cs b/test/Microsoft.TestPlatform.CommunicationUtilities.UnitTests/TestRequestSenderTests.cs index 94dfe51409..bb0e9412fa 100644 --- a/test/Microsoft.TestPlatform.CommunicationUtilities.UnitTests/TestRequestSenderTests.cs +++ b/test/Microsoft.TestPlatform.CommunicationUtilities.UnitTests/TestRequestSenderTests.cs @@ -289,6 +289,20 @@ public void DiscoverTestsShouldNotifyDiscoveryCompleteOnCompleteMessageReceived( this.mockDiscoveryEventsHandler.Verify(eh => eh.HandleDiscoveryComplete(It.Is(dc => dc.IsAborted == false && dc.TotalCount == 10), null)); } + [TestMethod] + public void DiscoverTestsShouldStopServerOnCompleteMessageReceived() + { + var completePayload = new DiscoveryCompletePayload { TotalTests = 10, IsAborted = false }; + this.SetupDeserializeMessage(MessageType.DiscoveryComplete, completePayload); + this.SetupFakeCommunicationChannel(); + + this.testRequestSender.DiscoverTests(new DiscoveryCriteria(), this.mockDiscoveryEventsHandler.Object); + + this.RaiseMessageReceivedEvent(); + + this.mockServer.Verify(ms => ms.Stop()); + } + [TestMethod] public void DiscoverTestShouldNotifyLogMessageOnTestMessageReceived() { @@ -505,6 +519,25 @@ public void StartTestRunShouldNotifyExecutionCompleteOnRunCompleteMessageReceive Times.Once); } + [TestMethod] + public void StartTestRunShouldStopServerOnRunCompleteMessageReceived() + { + var testRunCompletePayload = new TestRunCompletePayload + { + TestRunCompleteArgs = new TestRunCompleteEventArgs(null, false, false, null, null, TimeSpan.MaxValue), + LastRunTests = new TestRunChangedEventArgs(null, null, null), + RunAttachments = new List() + }; + this.SetupDeserializeMessage(MessageType.ExecutionComplete, testRunCompletePayload); + this.SetupFakeCommunicationChannel(); + + this.testRequestSender.StartTestRun(this.testRunCriteriaWithSources, this.mockExecutionEventsHandler.Object); + + this.RaiseMessageReceivedEvent(); + + this.mockServer.Verify(ms => ms.Stop()); + } + [TestMethod] public void StartTestRunShouldNotifyLogMessageOnTestMessageReceived() {