Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix socket exception on datacollection in parallel #1505

Merged
merged 9 commits into from
Mar 26, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Task Send(string data)
}
catch (Exception ex)
{
EqtTrace.Verbose("LengthPrefixCommunicationChannel: Error sending data: {0}.", ex);
EqtTrace.Error("LengthPrefixCommunicationChannel.Send: Error sending data: {0}.", ex);
throw new CommunicationException("Unable to send data over channel.", ex);
}

Expand All @@ -78,6 +78,7 @@ public Task NotifyDataAvailable()
/// <inheritdoc />
public void Dispose()
{
EqtTrace.Verbose("LengthPrefixCommunicationChannel.Dispose: Dispose reader and writer.");
this.reader.Dispose();
this.writer.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class SocketClient : ICommunicationEndPoint
private readonly Func<Stream, ICommunicationChannel> channelFactory;
private ICommunicationChannel channel;
private bool stopped;
private string endPoint;

public SocketClient()
: this(stream => new LengthPrefixCommunicationChannel(stream))
Expand All @@ -49,12 +50,10 @@ protected SocketClient(Func<Stream, ICommunicationChannel> channelFactory)
/// <inheritdoc />
public string Start(string endPoint)
{
this.endPoint = endPoint;
var ipEndPoint = endPoint.GetIPEndPoint();

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Waiting for connecting to server");
}
EqtTrace.Info("SocketClient.Start: connecting to server endpoint: {0}", endPoint);

// Don't start if the endPoint port is zero
this.tcpClient.ConnectAsync(ipEndPoint.Address, ipEndPoint.Port).ContinueWith(this.OnServerConnected);
Expand All @@ -64,6 +63,8 @@ public string Start(string endPoint)
/// <inheritdoc />
public void Stop()
{
EqtTrace.Info("SocketClient.Stop: Stop communication from server endpoint: {0}", this.endPoint);

if (!this.stopped)
{
EqtTrace.Info("SocketClient: Stop: Cancellation requested. Stopping message loop.");
Expand All @@ -73,6 +74,8 @@ public void Stop()

private void OnServerConnected(Task connectAsyncTask)
{
EqtTrace.Info("SocketClient.OnServerConnected: connected to server endpoint: {0}", this.endPoint);

if (this.Connected != null)
{
if (connectAsyncTask.IsFaulted)
Expand Down Expand Up @@ -105,6 +108,8 @@ private void OnServerConnected(Task connectAsyncTask)

private void Stop(Exception error)
{
EqtTrace.Info("SocketClient.PrivateStop: Stop communication from server endpoint: {0}, error:{1}", this.endPoint, error);

if (!this.stopped)
{
// Do not allow stop to be called multiple times.
Expand Down
17 changes: 11 additions & 6 deletions src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
Expand All @@ -31,6 +30,8 @@ public class SocketServer : ICommunicationEndPoint

private bool stopped;

private string endPoint;

/// <summary>
/// Initializes a new instance of the <see cref="SocketServer"/> class.
/// </summary>
Expand Down Expand Up @@ -64,21 +65,22 @@ public string Start(string endPoint)

this.tcpListener.Start();

var connectionInfo = ((IPEndPoint)this.tcpListener.LocalEndpoint).ToString();
EqtTrace.Info("SocketServer: Listening on end point : {0}", connectionInfo);
this.endPoint = ((IPEndPoint)this.tcpListener.LocalEndpoint).ToString();
EqtTrace.Info("SocketServer.Start: Listening on endpoint : {0}", this.endPoint);

// Serves a single client at the moment. An error in connection, or message loop just
// terminates the entire server.
this.tcpListener.AcceptTcpClientAsync().ContinueWith(t => this.OnClientConnected(t.Result));
return connectionInfo;
return this.endPoint;
}

/// <inheritdoc />
public void Stop()
{
EqtTrace.Info("SocketServer.Stop: Stop server endPoint: {0}", this.endPoint);
if (!this.stopped)
{
EqtTrace.Info("SocketServer: Stop: Cancellation requested. Stopping message loop.");
EqtTrace.Info("SocketServer.Stop: Cancellation requested. Stopping message loop.");
this.cancellation.Cancel();
}
}
Expand All @@ -95,7 +97,7 @@ private void OnClientConnected(TcpClient client)

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Client connected, and starting MessageLoopAsync");
EqtTrace.Verbose("SocketServer.OnClientConnected: Client connected for endPoint: {0}, starting MessageLoopAsync:", this.endPoint);
}

// Start the message loop
Expand All @@ -105,6 +107,8 @@ private void OnClientConnected(TcpClient client)

private void Stop(Exception error)
{
EqtTrace.Info("SocketServer.PrivateStop: Stopp server endPoint: {0} error: {1}", this.endPoint, error);

if (!this.stopped)
{
// Do not allow stop to be called multiple times.
Expand All @@ -124,6 +128,7 @@ private void Stop(Exception error)
this.channel.Dispose();
this.cancellation.Dispose();

EqtTrace.Info("SocketServer.Stop: Raise disconnected event endPoint: {0} error: {1}", this.endPoint, error);
this.Disconnected?.SafeInvoke(this, new DisconnectedEventArgs { Error = error }, "SocketServer: ClientDisconnected");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ internal static Task MessageLoopAsync(
CancellationToken cancellationToken)
{
Exception error = null;
var remoteEndPoint = ((IPEndPoint)client.Client.RemoteEndPoint).ToString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need typecast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

var localEndPoint = ((IPEndPoint)client.Client.LocalEndPoint).ToString();

// Set read timeout to avoid blocking receive raw message
while (channel != null && !cancellationToken.IsCancellationRequested)
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: loop starting: remoteendPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);

try
{
if (client.Client.Poll(STREAMREADTIMEOUT, SelectMode.SelectRead))
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: NotifyDataAvailable remoteendPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageLoopAsync [](start = 62, length = 16)

Lets see logs for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

channel.NotifyDataAvailable();
}
}
Expand All @@ -43,23 +48,29 @@ internal static Task MessageLoopAsync(
&& socketException.SocketErrorCode == SocketError.TimedOut)
{
EqtTrace.Info(
"Socket: Message loop: failed to receive message due to read timeout {0}",
ioException);
"Socket: Message loop: failed to receive message due to read timeout {0}, remoteendPoint: {1} localEndPoint: {2}",
ioException,
remoteEndPoint,
localEndPoint);
}
else
{
EqtTrace.Error(
"Socket: Message loop: failed to receive message due to socket error {0}",
ioException);
"Socket: Message loop: failed to receive message due to socket error {0}, remoteendPoint: {1} localEndPoint: {2}",
ioException,
remoteEndPoint,
localEndPoint);
error = ioException;
break;
}
}
catch (Exception exception)
{
EqtTrace.Error(
"Socket: Message loop: failed to receive message {0}",
exception);
"Socket: Message loop: failed to receive message {0}, remoteendPoint: {1} localEndPoint: {2}",
exception,
remoteEndPoint,
localEndPoint);
error = exception;
break;
}
Expand All @@ -68,6 +79,8 @@ internal static Task MessageLoopAsync(
// Try clean up and raise client disconnected events
errorHandler(error);

EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: exiting MessageLoopAsync remoteendPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);

return Task.FromResult(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ private void SetOperationComplete()
EqtTrace.Verbose("TestRequestSender.SetOperationComplete: Setting operation complete.");
}

this.communicationEndpoint.Stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need lock here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required.

Interlocked.CompareExchange(ref this.operationCompleted, 1, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ internal class ProxyDataCollectionManager : IProxyDataCollectionManager
private string settingsXml;
private int connectionTimeout;
private IRequestData requestData;
private int dataCollectionPort;
private int dataCollectionProcessId;

/// <summary>
/// Initializes a new instance of the <see cref="ProxyDataCollectionManager"/> class.
Expand Down Expand Up @@ -126,6 +128,7 @@ public Collection<AttachmentSet> AfterTestRunEnd(bool isCanceled, ITestMessageEv
this.InvokeDataCollectionServiceAction(
() =>
{
EqtTrace.Info("ProxyDataCollectionManager.AfterTestRunEnd: Get attachment set for datacollector processId: {0} port: {1}", dataCollectionProcessId, dataCollectionPort);
attachmentSet = this.dataCollectionRequestSender.SendAfterTestRunStartAndGetResult(runEventsHandler, isCanceled);
},
runEventsHandler);
Expand Down Expand Up @@ -159,9 +162,15 @@ public DataCollectionParameters BeforeTestRunStart(
this.InvokeDataCollectionServiceAction(
() =>
{
EqtTrace.Info("ProxyDataCollectionManager.BeforeTestRunStart: Get env variable and port for datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort);
var result = this.dataCollectionRequestSender.SendBeforeTestRunStartAndGetResult(this.settingsXml, runEventsHandler);
environmentVariables = result.EnvironmentVariables;
dataCollectionEventsPort = result.DataCollectionEventsPort;

EqtTrace.Info(
"ProxyDataCollectionManager.BeforeTestRunStart: SendBeforeTestRunStartAndGetResult successful, env variable from datacollector: {0} and testhost port: {1}",
string.Join(";", environmentVariables),
dataCollectionEventsPort);
},
runEventsHandler);
return new DataCollectionParameters(
Expand All @@ -175,21 +184,27 @@ public DataCollectionParameters BeforeTestRunStart(
/// </summary>
public void Dispose()
{
EqtTrace.Info("ProxyDataCollectionManager.Dispose: calling dospose for datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort);
this.dataCollectionRequestSender.Close();
}

/// <inheritdoc />
public void Initialize()
{
var port = this.dataCollectionRequestSender.InitializeCommunication();
this.dataCollectionPort = this.dataCollectionRequestSender.InitializeCommunication();

// Warn the user that execution will wait for debugger attach.
var processId = this.dataCollectionLauncher.LaunchDataCollector(null, this.GetCommandLineArguments(port));
this.dataCollectionProcessId = this.dataCollectionLauncher.LaunchDataCollector(null, this.GetCommandLineArguments(this.dataCollectionPort));
EqtTrace.Info("ProxyDataCollectionManager.Initialize: Launched datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort);

ChangeConnectionTimeoutIfRequired(dataCollectionProcessId);

EqtTrace.Info("ProxyDataCollectionManager.Initialize: waiting for connection with timeout: {0}", this.connectionTimeout);

ChangeConnectionTimeoutIfRequired(processId);
var connected = this.dataCollectionRequestSender.WaitForRequestHandlerConnection(this.connectionTimeout);
if (connected == false)
{
EqtTrace.Error("ProxyDataCollectionManager.Initialize: failed to connect to datacollector process.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add datacollector process ID we could not connect to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

throw new TestPlatformException(string.Format(CultureInfo.CurrentUICulture, CrossPlatEngineResources.FailedToConnectDataCollector));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.EventHandlers;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.DataCollection.Interfaces;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.EventHandlers;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;
Expand All @@ -20,7 +19,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Utilities;
using Microsoft.VisualStudio.TestPlatform.Utilities;
using CrossPlatResources = Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Resources.Resources;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine;

public class TestRequestHandler : IDisposable, ITestRequestHandler
{
Expand Down Expand Up @@ -131,14 +129,14 @@ public void Close()
public void SendTestCases(IEnumerable<TestCase> discoveredTestCases)
{
var data = this.dataSerializer.SerializePayload(MessageType.TestCasesFound, discoveredTestCases, this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
public void SendTestRunStatistics(TestRunChangedEventArgs testRunChangedArgs)
{
var data = this.dataSerializer.SerializePayload(MessageType.TestRunStatsChange, testRunChangedArgs, this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -148,7 +146,7 @@ public void SendLog(TestMessageLevel messageLevel, string message)
MessageType.TestMessage,
new TestMessagePayload { MessageLevel = messageLevel, Message = message },
this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -168,7 +166,7 @@ public void SendExecutionComplete(
ExecutorUris = executorUris
},
this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -184,7 +182,7 @@ public void DiscoveryComplete(DiscoveryCompleteEventArgs discoveryCompleteEventA
Metrics = discoveryCompleteEventArgs.Metrics
},
this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -201,7 +199,7 @@ public int LaunchProcessWithDebuggerAttached(TestProcessStartInfo testProcessSta
var data = dataSerializer.SerializePayload(MessageType.LaunchAdapterProcessWithDebuggerAttached,
testProcessStartInfo, protocolVersion);

this.channel.Send(data);
this.SendData(data);

EqtTrace.Verbose("Waiting for LaunchAdapterProcessWithDebuggerAttached ack");
waitHandle.Wait();
Expand Down Expand Up @@ -389,5 +387,10 @@ private void SetCommunicationEndPoint()
}
}

private void SendData(string data)
{
EqtTrace.Verbose("TestRequestHandler.SendData: sending data from testhost: {0}", data);
this.channel.Send(data);
}
}
}
Loading