Skip to content

Commit

Permalink
Timeout when waiting for TCP connection (#580)
Browse files Browse the repository at this point in the history
This will make the app not hang indefinitely in case something happens to the host

This needs #579
  • Loading branch information
premun authored May 4, 2021
1 parent 9b4c765 commit 89cb4b1
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 106 deletions.
237 changes: 138 additions & 99 deletions src/Microsoft.DotNet.XHarness.TestRunners.Common/TcpTextWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,161 +8,200 @@
// https://github.com/spouliot/Touch.Unit/blob/master/NUnitLite/TouchRunner/TcpTextWriter.cs

using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;

#nullable enable
namespace Microsoft.DotNet.XHarness.TestRunners.Common
{
internal class TcpTextWriter : TextWriter
{
private readonly TcpClient _client;
private readonly TcpListener _server;
private readonly StreamWriter _writer;
private static readonly TimeSpan s_connectionAwaitPeriod = TimeSpan.FromMinutes(1);

private static string SelectHostName(string[] names, int port)
{
if (names.Length == 0)
{
return null;
}
private TcpClient? _client = null;
private StreamWriter? _writer = null;

if (names.Length == 1)
public void InitializeTunnelConnection(int port)
{
if ((port < 0) || (port > ushort.MaxValue))
{
return names[0];
throw new ArgumentOutOfRangeException(nameof(port), $"Port must be between 0 and {ushort.MaxValue}");
}

object lock_obj = new object();
string result = null;
int failures = 0;
var server = new TcpListener(IPAddress.Any, port);
server.Server.ReceiveTimeout = 5000;
server.Start();
var watch = Stopwatch.StartNew();

using (var evt = new ManualResetEvent(false))
while (!server.Pending())
{
for (int i = names.Length - 1; i >= 0; i--)
if (watch.Elapsed > s_connectionAwaitPeriod)
{
var name = names[i];
ThreadPool.QueueUserWorkItem((v) =>
{
try
{
var client = new TcpClient(name, port);
using (var writer = new StreamWriter(client.GetStream()))
{
writer.WriteLine("ping");
}
lock (lock_obj)
{
if (result == null)
{
result = name;
}
}
evt.Set();
}
catch (Exception)
{
lock (lock_obj)
{
failures++;
if (failures == names.Length)
{
evt.Set();
}
}
}
});
throw new Exception($"No inbound TCP connection after {(int) s_connectionAwaitPeriod.TotalSeconds} seconds");
}

// Wait for 1 success or all failures
evt.WaitOne();
Thread.Sleep(100);
}

if (result == null)
_client = server.AcceptTcpClient();

// Block until we have the ping from the client side
byte[] buffer = new byte[16 * 1024];
var stream = _client.GetStream();
while ((_ = stream.Read(buffer, 0, buffer.Length)) != 0)
{
throw new InvalidOperationException("Couldn't connect to any of the hostnames.");
var message = Encoding.UTF8.GetString(buffer);
if (message.Contains("ping"))
{
break;
}
}

return result;
_writer = new StreamWriter(_client.GetStream());
}

public TcpTextWriter(string hostName, int port, bool isTunnel = false)
public void InitializeDirectConnection(string hostName, int port)
{
if ((port < 0) || (port > ushort.MaxValue))
{
throw new ArgumentOutOfRangeException(nameof(port), $"Port must be between 0 and {ushort.MaxValue}");
}

if (!isTunnel && hostName == null)
if (hostName is null)
{
throw new ArgumentNullException(nameof(hostName));
}

if (!isTunnel)
if ((port < 0) || (port > ushort.MaxValue))
{
HostName = SelectHostName(hostName.Split(','), port);
throw new ArgumentOutOfRangeException(nameof(port), $"Port must be between 0 and {ushort.MaxValue}");
}

Port = port;

if (isTunnel)
{
_server = new TcpListener(IPAddress.Any, Port);
_server.Server.ReceiveTimeout = 5000;
_server.Start();

_client = _server.AcceptTcpClient();

// Block until we have the ping from the client side
byte[] buffer = new byte[16 * 1024];
var stream = _client.GetStream();
while ((_ = stream.Read(buffer, 0, buffer.Length)) != 0)
{
var message = Encoding.UTF8.GetString(buffer);
if (message.Contains("ping"))
{
break;
}
}
}
else
{
_client = new TcpClient(HostName, port);
}
hostName = SelectHostName(hostName.Split(','), port);

_client = new TcpClient(hostName, port);
_writer = new StreamWriter(_client.GetStream());
}

public string HostName { get; private set; }

public int Port { get; private set; }

// we override everything that StreamWriter overrides from TextWriter

public override System.Text.Encoding Encoding => Encoding.UTF8;
public override Encoding Encoding => Encoding.UTF8;

public override void Close() => _writer.Close();
public override void Close()
{
ValidateWriter();
_writer.Close();
}

protected override void Dispose(bool disposing) => _writer.Dispose();
protected override void Dispose(bool disposing) => _writer?.Dispose();

public override void Flush() => _writer.Flush();
public override void Flush()
{
ValidateWriter();
_writer.Flush();
}

// minimum to override - see http://msdn.microsoft.com/en-us/library/system.io.textwriter.aspx
public override void Write(char value) => _writer.Write(value);
public override void Write(char value)
{
ValidateWriter();
_writer.Write(value);
}

public override void Write(char[] buffer) => _writer.Write(buffer);
public override void Write(char[]? buffer)
{
ValidateWriter();
_writer.Write(buffer);
}

public override void Write(char[] buffer, int index, int count) => _writer.Write(buffer, index, count);
public override void Write(char[] buffer, int index, int count)
{
ValidateWriter();
_writer.Write(buffer, index, count);
}

public override void Write(string value) => _writer.Write(value);
public override void Write(string? value)
{
ValidateWriter();
_writer.Write(value);
}

// special extra override to ensure we flush data regularly

public override void WriteLine()
{
ValidateWriter();
_writer.WriteLine();
_writer.Flush();
}

private static string SelectHostName(string[] names, int port)
{
if (names.Length == 1)
{
return names[0];
}

object lock_obj = new object();
string? result = null;
int failures = 0;

using (var evt = new ManualResetEvent(false))
{
for (int i = names.Length - 1; i >= 0; i--)
{
var name = names[i];
ThreadPool.QueueUserWorkItem((v) =>
{
try
{
var client = new TcpClient(name, port);
using (var writer = new StreamWriter(client.GetStream()))
{
writer.WriteLine("ping");
}
lock (lock_obj)
{
if (result == null)
{
result = name;
}
}
evt.Set();
}
catch (Exception)
{
lock (lock_obj)
{
failures++;
if (failures == names.Length)
{
evt.Set();
}
}
}
});
}

// Wait for 1 success or all failures
evt.WaitOne();
}

if (result == null)
{
throw new InvalidOperationException("Couldn't connect to any of the hostnames.");
}

return result;
}

[MemberNotNull(nameof(_writer))]
private void ValidateWriter()
{
if (_writer == null)
{
throw new InvalidOperationException("Please initialize the writer before usage by calling one of the Initialize*() methods.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@ protected override TestRunner GetTestRunner(LogWriter logWriter)
public override async Task RunAsync()
{
var options = ApplicationOptions.Current;
TcpTextWriter writer = null;
if (!string.IsNullOrEmpty(options.HostName))
var writer = new TcpTextWriter();
try
{
try
if (options.UseTunnel)
{
writer = new TcpTextWriter(options.HostName, options.HostPort, options.UseTunnel);
writer.InitializeTunnelConnection(options.HostPort);
}
catch (Exception ex)
else
{
Console.WriteLine("Network error: Cannot connect to {0}:{1}: {2}. Continuing on console.", options.HostName, options.HostPort, ex);
writer = null; // will default to the console
writer.InitializeDirectConnection(options.HostName, options.HostPort);
}
}
catch (Exception ex)
{
Console.WriteLine("Cannot connect to {0}:{1}: {2}. Continuing on console.", options.HostName, options.HostPort, ex);
writer = null; // will default to the console
}

// we generate the logs in two different ways depending if the generate xml flag was
// provided. If it was, we will write the xml file to the tcp writer if present, else
Expand Down

0 comments on commit 89cb4b1

Please sign in to comment.