From ecb28eec3722afc230a676d9ecc823c31c45c81c Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 16:38:43 +1000 Subject: [PATCH 1/5] Improve the way the read and write stream timeouts are changed for a particular operation --- .../Protocol/MessageExchangeStream.cs | 28 +++++++++-------- .../Protocol/MessageExchangeStreamTimeout.cs | 10 ++++++ .../Protocol/StreamTimeoutExtensionMethods.cs | 31 +++++++++++++------ 3 files changed, 46 insertions(+), 23 deletions(-) create mode 100644 source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 35c4b9be..e248038c 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -35,8 +35,8 @@ public MessageExchangeStream(Stream stream, IMessageSerializer serializer, Halib this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.controlMessageReader = new ControlMessageReader(halibutTimeoutsAndLimits); this.serializer = serializer; - - SetReadAndWriteTimeouts(halibutTimeoutsAndLimits.TcpClientTimeout); + + SetReadAndWriteTimeouts(MessageExchangeStreamTimeout.NormalTimeout); } static int streamCount; @@ -65,19 +65,21 @@ async Task SendIdentityMessageAsync(string identityLine, CancellationToken cance public async Task SendNextAsync(CancellationToken cancellationToken) { await WithTimeout( - halibutTimeoutsAndLimits.TcpClientHeartbeatTimeout, - async () => await SendControlMessageAsync(Next, cancellationToken)); + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => await SendControlMessageAsync(Next, cancellationToken)); } public async Task SendProceedAsync(CancellationToken cancellationToken) { - await SendControlMessageAsync(Proceed, cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => await SendControlMessageAsync(Proceed, cancellationToken)); } public async Task SendEndAsync(CancellationToken cancellationToken) { await WithTimeout( - halibutTimeoutsAndLimits.TcpClientHeartbeatTimeout, + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, async () => await SendControlMessageAsync(End, cancellationToken)); } @@ -97,7 +99,7 @@ public async Task ExpectNextOrEndAsync(CancellationToken cancellationToken public async Task ExpectProceedAsync(CancellationToken cancellationToken) { await WithTimeout( - halibutTimeoutsAndLimits.TcpClientHeartbeatTimeout, + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, async () => { var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); @@ -185,19 +187,19 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) return result; } - async Task WithTimeout(SendReceiveTimeout timeout, Func func) + async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) { - await stream.WithTimeout(timeout, func); + await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); } - async Task WithTimeout(SendReceiveTimeout timeout, Func> func) + async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) { - return await stream.WithTimeout(timeout, func); + return await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); } - void SetReadAndWriteTimeouts(SendReceiveTimeout timeout) + void SetReadAndWriteTimeouts(MessageExchangeStreamTimeout timeout) { - stream.SetReadAndWriteTimeouts(timeout); + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); } static RemoteIdentityType ParseIdentityType(string identityType) diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs new file mode 100644 index 00000000..d79b5bed --- /dev/null +++ b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs @@ -0,0 +1,10 @@ +using System; + +namespace Halibut.Transport.Protocol +{ + public enum MessageExchangeStreamTimeout + { + NormalTimeout, + ControlMessageExchangeShortTimeout + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs index b3da1505..3ea7f36d 100644 --- a/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs +++ b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs @@ -7,7 +7,7 @@ namespace Halibut.Transport.Protocol { public static class StreamTimeoutExtensionMethods { - public static async Task WithTimeout(this Stream stream, SendReceiveTimeout timeout, Func func) + public static async Task WithTimeout(this Stream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, MessageExchangeStreamTimeout timeout, Func func) { if (!stream.CanTimeout) { @@ -21,7 +21,7 @@ public static async Task WithTimeout(this Stream stream, SendReceiveTimeout time try { - stream.SetReadAndWriteTimeouts(timeout); + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); await func(); } finally @@ -30,8 +30,8 @@ public static async Task WithTimeout(this Stream stream, SendReceiveTimeout time stream.WriteTimeout = currentWriteTimeout; } } - - public static async Task WithTimeout(this Stream stream, SendReceiveTimeout timeout, Func> func) + + public static async Task WithTimeout(this Stream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, MessageExchangeStreamTimeout timeout, Func> func) { if (!stream.CanTimeout) { @@ -43,7 +43,7 @@ public static async Task WithTimeout(this Stream stream, SendReceiveTimeou try { - stream.SetReadAndWriteTimeouts(timeout); + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); return await func(); } finally @@ -52,16 +52,27 @@ public static async Task WithTimeout(this Stream stream, SendReceiveTimeou stream.WriteTimeout = currentWriteTimeout; } } - - public static void SetReadAndWriteTimeouts(this Stream stream, SendReceiveTimeout timeout) + + public static void SetReadAndWriteTimeouts(this Stream stream, MessageExchangeStreamTimeout timeout, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits) { if (!stream.CanTimeout) { return; } - - stream.WriteTimeout = (int)timeout.SendTimeout.TotalMilliseconds; - stream.ReadTimeout = (int)timeout.ReceiveTimeout.TotalMilliseconds; + + switch (timeout) + { + case MessageExchangeStreamTimeout.NormalTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatReceiveTimeout.TotalMilliseconds; + break; + default: + throw new ArgumentOutOfRangeException(nameof(timeout), timeout, null); + } } } } From 5684fdcfc0c801e68f1e34b6d94a12fab39cef08 Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 15:19:56 +1000 Subject: [PATCH 2/5] Add an auth short timeout Add a polling for next request short timeout --- ...HalibutTimeoutsAndLimitsForTestsBuilder.cs | 17 ++++-- .../Support/SerilogLoggerBuilder.cs | 12 ++--- .../Transport/Protocol/ProtocolFixture.cs | 14 +++++ .../Diagnostics/HalibutTimeoutsAndLimits.cs | 13 +++-- .../Protocol/IMessageExchangeStream.cs | 3 ++ .../Protocol/MessageExchangeProtocol.cs | 4 +- .../Protocol/MessageExchangeStream.cs | 54 +++++++++++++------ .../Protocol/MessageExchangeStreamTimeout.cs | 4 +- .../Protocol/StreamTimeoutExtensionMethods.cs | 8 +++ .../TcpClientTimeoutExtensionMethods.cs | 53 ++++++++++++++++++ source/Halibut/Transport/SecureListener.cs | 2 +- .../Halibut/Transport/TcpConnectionFactory.cs | 15 +++--- 12 files changed, 157 insertions(+), 42 deletions(-) create mode 100644 source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs diff --git a/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs b/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs index d1db9676..c2d8c360 100644 --- a/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs +++ b/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs @@ -6,6 +6,8 @@ namespace Halibut.Tests public class HalibutTimeoutsAndLimitsForTestsBuilder { public static readonly TimeSpan HalfTheTcpReceiveTimeout = TimeSpan.FromSeconds(22.5); + static readonly TimeSpan PollingQueueWaitTimeout = TimeSpan.FromSeconds(20); + static readonly TimeSpan ShortTimeout = TimeSpan.FromSeconds(15); public HalibutTimeoutsAndLimits Build() { @@ -22,13 +24,18 @@ public HalibutTimeoutsAndLimits Build() TcpClientTimeout = new( sendTimeout: HalfTheTcpReceiveTimeout + HalfTheTcpReceiveTimeout, receiveTimeout: HalfTheTcpReceiveTimeout + HalfTheTcpReceiveTimeout), - - TcpClientHeartbeatTimeout = new( - sendTimeout: TimeSpan.FromSeconds(15), - receiveTimeout: TimeSpan.FromSeconds(15)), + + TcpClientHeartbeatSendTimeout = ShortTimeout, + TcpClientHeartbeatReceiveTimeout = ShortTimeout, + + TcpClientAuthenticationSendTimeout = ShortTimeout, + TcpClientAuthenticationReceiveTimeout = ShortTimeout, + + TcpClientPollingForNextRequestSendTimeout = ShortTimeout, + TcpClientPollingForNextRequestReceiveTimeout = PollingQueueWaitTimeout + ShortTimeout, TcpClientConnectTimeout = TimeSpan.FromSeconds(20), - PollingQueueWaitTimeout = TimeSpan.FromSeconds(20) + PollingQueueWaitTimeout = PollingQueueWaitTimeout }; } } diff --git a/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs b/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs index d447c312..52db0f7e 100644 --- a/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs +++ b/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs @@ -57,18 +57,18 @@ public ILogger Build() var testHash = CurrentTestHash(); var logger = Logger.ForContext("TestHash", testHash); - if (!HasLoggedTestHash.Contains(testName)) - { - HasLoggedTestHash.Add(testName); - logger.Information($"Test: {TestContext.CurrentContext.Test.Name} has hash {testHash}"); - } - if (traceFileLogger != null) { TraceLoggers.AddOrUpdate(testName, traceFileLogger, (_, _) => throw new Exception("This should never be updated. If it is, it means that a test is being run multiple times in a single test run")); traceFileLogger.SetTestHash(testHash); } + if (!HasLoggedTestHash.Contains(testName)) + { + HasLoggedTestHash.Add(testName); + logger.Information($"Test: {TestContext.CurrentContext.Test.Name} has hash {testHash}"); + } + return logger; } diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 87c5cf6a..7de27ab7 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -371,6 +371,20 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) return (T)(nextReadQueue.Count > 0 ? nextReadQueue.Dequeue() : default(T)); } + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) + { + output.AppendLine("|-- Set Timeout " + timeout); + + await func(); + } + + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) + { + output.AppendLine("|-- Set Timeout " + timeout); + + return await func(); + } + public override string ToString() { return output.ToString(); diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index fbc79953..7eaa6e49 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -53,11 +53,14 @@ public HalibutTimeoutsAndLimits() { } /// Amount of time a connection can stay in the pool /// public TimeSpan TcpClientPooledConnectionTimeout { get; set; } = TimeSpan.FromMinutes(9); - - /// - /// Amount of time to wait for a TCP or SslStream read/write to complete successfully for a control message - /// - public SendReceiveTimeout TcpClientHeartbeatTimeout { get; set; } = new(sendTimeout: TimeSpan.FromSeconds(60), receiveTimeout: TimeSpan.FromSeconds(60)); + + public TimeSpan TcpClientHeartbeatSendTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientHeartbeatReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); + + public TimeSpan TcpClientAuthenticationSendTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientAuthenticationReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientPollingForNextRequestSendTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientPollingForNextRequestReceiveTimeout { get; set; } = TimeSpan.FromSeconds(30) + TimeSpan.FromSeconds(60); /// /// Amount of time to wait for a successful TCP or WSS connection diff --git a/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs b/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs index 2b6e4b12..364bc8ce 100644 --- a/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs @@ -26,5 +26,8 @@ public interface IMessageExchangeStream Task SendAsync(T message, CancellationToken cancellationToken); Task ReceiveAsync(CancellationToken cancellationToken); + + Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func); + Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func); } } \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index 1a79bd68..60957702 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -93,7 +93,9 @@ public async Task ExchangeAsSubscriberAsync(Uri subscriptionId, Func> incomingRequestProcessor, CancellationToken cancellationToken) { - var request = await stream.ReceiveAsync(cancellationToken); + var request = await stream.WithTimeout( + MessageExchangeStreamTimeout.PollingForNextRequestShortTimeout, + async () => await stream.ReceiveAsync(cancellationToken)); if (request != null) { diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index e248038c..8dbb93bb 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -43,9 +43,14 @@ public MessageExchangeStream(Stream stream, IMessageSerializer serializer, Halib public async Task IdentifyAsClientAsync(CancellationToken cancellationToken) { - log.Write(EventType.Diagnostic, "Identifying as a client"); - await SendIdentityMessageAsync($"{MxClient} {currentVersion}", cancellationToken); - await ExpectServerIdentityAsync(cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.AuthenticationShortTimeout, + async () => + { + log.Write(EventType.Diagnostic, "Identifying as a client"); + await SendIdentityMessageAsync($"{MxClient} {currentVersion}", cancellationToken); + await ExpectServerIdentityAsync(cancellationToken); + }); } async Task SendControlMessageAsync(string message, CancellationToken cancellationToken) @@ -85,15 +90,20 @@ await WithTimeout( public async Task ExpectNextOrEndAsync(CancellationToken cancellationToken) { - var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); - - return line switch - { - Next => true, - null => false, - End => false, - _ => throw new ProtocolException($"Expected {Next} or {End}, got: " + line) - }; + return await WithTimeout( + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => + { + var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); + + return line switch + { + Next => true, + null => false, + End => false, + _ => throw new ProtocolException($"Expected {Next} or {End}, got: " + line) + }; + }); } public async Task ExpectProceedAsync(CancellationToken cancellationToken) @@ -118,13 +128,23 @@ await WithTimeout( public async Task IdentifyAsSubscriberAsync(string subscriptionId, CancellationToken cancellationToken) { - await SendIdentityMessageAsync($"{MxSubscriber} {currentVersion} {subscriptionId}", cancellationToken); - await ExpectServerIdentityAsync(cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.AuthenticationShortTimeout, + async () => + { + await SendIdentityMessageAsync($"{MxSubscriber} {currentVersion} {subscriptionId}", cancellationToken); + await ExpectServerIdentityAsync(cancellationToken); + }); } public async Task IdentifyAsServerAsync(CancellationToken cancellationToken) { - await SendIdentityMessageAsync($"{MxServer} {currentVersion}", cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.AuthenticationShortTimeout, + async () => + { + await SendIdentityMessageAsync($"{MxServer} {currentVersion}", cancellationToken); + }); } public async Task ReadRemoteIdentityAsync(CancellationToken cancellationToken) @@ -187,12 +207,12 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) return result; } - async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) { await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); } - async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) { return await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); } diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs index d79b5bed..4e7ecb00 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs @@ -5,6 +5,8 @@ namespace Halibut.Transport.Protocol public enum MessageExchangeStreamTimeout { NormalTimeout, - ControlMessageExchangeShortTimeout + ControlMessageExchangeShortTimeout, + AuthenticationShortTimeout, + PollingForNextRequestShortTimeout } } \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs index 3ea7f36d..e5ac23ac 100644 --- a/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs +++ b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs @@ -70,6 +70,14 @@ public static void SetReadAndWriteTimeouts(this Stream stream, MessageExchangeSt stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatSendTimeout.TotalMilliseconds; stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatReceiveTimeout.TotalMilliseconds; break; + case MessageExchangeStreamTimeout.AuthenticationShortTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.PollingForNextRequestShortTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestReceiveTimeout.TotalMilliseconds; + break; default: throw new ArgumentOutOfRangeException(nameof(timeout), timeout, null); } diff --git a/source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs b/source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs new file mode 100644 index 00000000..a0cb4365 --- /dev/null +++ b/source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs @@ -0,0 +1,53 @@ +using System; +using System.IO; +using System.Net.Sockets; +using System.Threading.Tasks; +using Halibut.Diagnostics; + +namespace Halibut.Transport.Protocol +{ + public static class TcpClientTimeoutExtensionMethods + { + public static async Task WithTimeout(this TcpClient stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, MessageExchangeStreamTimeout timeout, Func func) + { + var currentReadTimeout = stream.Client.ReceiveTimeout; + var currentWriteTimeout = stream.Client.SendTimeout; + + try + { + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); + await func(); + } + finally + { + stream.ReceiveTimeout = currentReadTimeout; + stream.SendTimeout = currentWriteTimeout; + } + } + + public static void SetReadAndWriteTimeouts(this TcpClient stream, MessageExchangeStreamTimeout timeout, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits) + { + switch (timeout) + { + case MessageExchangeStreamTimeout.NormalTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.AuthenticationShortTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.PollingForNextRequestShortTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestReceiveTimeout.TotalMilliseconds; + break; + default: + throw new ArgumentOutOfRangeException(nameof(timeout), timeout, null); + } + } + } +} diff --git a/source/Halibut/Transport/SecureListener.cs b/source/Halibut/Transport/SecureListener.cs index 1be704d9..5a7b066b 100644 --- a/source/Halibut/Transport/SecureListener.cs +++ b/source/Halibut/Transport/SecureListener.cs @@ -276,7 +276,7 @@ async Task ExecuteRequest(TcpClient client) finally { if (!connectionAuthorizedAndObserved) - { + { connectionsObserver.ConnectionAccepted(false); } diff --git a/source/Halibut/Transport/TcpConnectionFactory.cs b/source/Halibut/Transport/TcpConnectionFactory.cs index d83a47ca..0efb8f7b 100644 --- a/source/Halibut/Transport/TcpConnectionFactory.cs +++ b/source/Halibut/Transport/TcpConnectionFactory.cs @@ -43,16 +43,19 @@ public async Task EstablishNewConnectionAsync(ExchangeProtocolBuild log.Write(EventType.SecurityNegotiation, "Performing TLS handshake"); + await client.WithTimeout(halibutTimeoutsAndLimits, MessageExchangeStreamTimeout.AuthenticationShortTimeout, async () => + { #if NETFRAMEWORK - // TODO: ASYNC ME UP! - // AuthenticateAsClientAsync in .NET 4.8 does not support cancellation tokens. So `cancellationToken` is not respected here. - await ssl.AuthenticateAsClientAsync(serviceEndpoint.BaseUri.Host, new X509Certificate2Collection(clientCertificate), SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false); + // TODO: ASYNC ME UP! + // AuthenticateAsClientAsync in .NET 4.8 does not support cancellation tokens. So `cancellationToken` is not respected here. + await ssl.AuthenticateAsClientAsync(serviceEndpoint.BaseUri.Host, new X509Certificate2Collection(clientCertificate), SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false); #else - await ssl.AuthenticateAsClientEnforcingTimeout(serviceEndpoint, new X509Certificate2Collection(clientCertificate), cancellationToken); + await ssl.AuthenticateAsClientEnforcingTimeout(serviceEndpoint, new X509Certificate2Collection(clientCertificate), cancellationToken); #endif - await ssl.WriteAsync(MxLine, 0, MxLine.Length, cancellationToken); - await ssl.FlushAsync(cancellationToken); + await ssl.WriteAsync(MxLine, 0, MxLine.Length, cancellationToken); + await ssl.FlushAsync(cancellationToken); + }); log.Write(EventType.Security, "Secure connection established. Server at {0} identified by thumbprint: {1}, using protocol {2}", client.Client.RemoteEndPoint, serviceEndpoint.RemoteThumbprint, ssl.SslProtocol.ToString()); From a454fa456482153778595083582156a72fb00563 Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 17:18:08 +1000 Subject: [PATCH 3/5] . --- .../Timeouts/TimeoutsApplyDuringHandShake.cs | 2 +- .../Transport/Protocol/ProtocolFixture.cs | 45 +++++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs b/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs index 60127efc..a51decb2 100644 --- a/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs +++ b/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs @@ -64,7 +64,7 @@ int writeNumberToPauseOn // Ie pause on the first or second write } sw.Stop(); - sw.Elapsed.Should().BeCloseTo(clientAndService.Service.TimeoutsAndLimits.TcpClientTimeout.ReceiveTimeout, TimeSpan.FromSeconds(15), "Since a paused connection early on should not hang forever."); + sw.Elapsed.Should().BeCloseTo(clientAndService.Service.TimeoutsAndLimits.TcpClientAuthenticationSendTimeout, TimeSpan.FromSeconds(15), "Since a paused connection early on should not hang forever."); await echo.SayHelloAsync("The pump wont be paused here so this should work."); } diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 7de27ab7..0b5fb9dd 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -121,22 +121,32 @@ public async Task ShouldExchangeAsSubscriber() AssertOutput(@" --> MX-SUBSCRIBE subscriptionId <-- MX-SERVER +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED"); } @@ -179,37 +189,57 @@ public async Task ShouldExchangeAsSubscriberWithPooling() AssertOutput(@" --> MX-SUBSCRIBE subscriptionId <-- MX-SERVER +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED"); } @@ -279,17 +309,12 @@ public void SetNumberOfReads(int reads) numberOfReads = reads; } - public void IdentifyAsClient() - { - output.AppendLine("--> MX-CLIENT"); - output.AppendLine("<-- MX-SERVER"); - } - public async Task IdentifyAsClientAsync(CancellationToken cancellationToken) { await Task.CompletedTask; - IdentifyAsClient(); + output.AppendLine("--> MX-CLIENT"); + output.AppendLine("<-- MX-SERVER"); } public async Task SendNextAsync(CancellationToken cancellationToken) @@ -374,15 +399,17 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) { output.AppendLine("|-- Set Timeout " + timeout); - await func(); + output.AppendLine("|-- Revert Timeout " + timeout); } public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) { output.AppendLine("|-- Set Timeout " + timeout); + var response = await func(); + output.AppendLine("|-- Revert Timeout " + timeout); - return await func(); + return response; } public override string ToString() From 516412c37718fb4e19ed364d29bab0b7c2e04b12 Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 17:21:23 +1000 Subject: [PATCH 4/5] . --- .../Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index 7eaa6e49..faa9cb2e 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -47,12 +47,17 @@ public HalibutTimeoutsAndLimits() { } /// /// Amount of time to wait for a TCP or SslStream read/write to complete successfully /// - public SendReceiveTimeout TcpClientTimeout { get; set; } = new(sendTimeout: TimeSpan.FromMinutes(10), receiveTimeout: TimeSpan.FromMinutes(10)); + public TimeSpan TcpClientSendTimeout { get; set; } = TimeSpan.FromMinutes(1); + + /// + /// Amount of time to wait for a TCP or SslStream read to complete successfully + /// + public TimeSpan TcpClientReceiveTimeout { get; set; } = TimeSpan.FromMinutes(5); /// /// Amount of time a connection can stay in the pool /// - public TimeSpan TcpClientPooledConnectionTimeout { get; set; } = TimeSpan.FromMinutes(9); + public TimeSpan TcpClientPooledConnectionTimeout { get; set; } = TimeSpan.FromMinutes(4.5); public TimeSpan TcpClientHeartbeatSendTimeout { get; set; } = TimeSpan.FromSeconds(60); public TimeSpan TcpClientHeartbeatReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); @@ -60,7 +65,7 @@ public HalibutTimeoutsAndLimits() { } public TimeSpan TcpClientAuthenticationSendTimeout { get; set; } = TimeSpan.FromSeconds(60); public TimeSpan TcpClientAuthenticationReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); public TimeSpan TcpClientPollingForNextRequestSendTimeout { get; set; } = TimeSpan.FromSeconds(60); - public TimeSpan TcpClientPollingForNextRequestReceiveTimeout { get; set; } = TimeSpan.FromSeconds(30) + TimeSpan.FromSeconds(60); + public TimeSpan TcpClientPollingForNextRequestReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); /// /// Amount of time to wait for a successful TCP or WSS connection From f6110bae7ecb43845068c6a14b3c42389c795c7d Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Mon, 20 Nov 2023 15:01:47 +1100 Subject: [PATCH 5/5] Fix rebase mistake --- source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index faa9cb2e..f4f59ce8 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -90,13 +90,13 @@ public TimeSpan SafeTcpClientPooledConnectionTimeout { get { - if (TcpClientPooledConnectionTimeout < TcpClientTimeout.ReceiveTimeout) + if (TcpClientPooledConnectionTimeout < TcpClientReceiveTimeout) { return TcpClientPooledConnectionTimeout; } - var timeout = TcpClientTimeout.ReceiveTimeout - TimeSpan.FromSeconds(10); - return timeout > TimeSpan.Zero ? timeout : TcpClientTimeout.ReceiveTimeout; + var timeout = TcpClientReceiveTimeout - TimeSpan.FromSeconds(10); + return timeout > TimeSpan.Zero ? timeout : TcpClientReceiveTimeout; } }