diff --git a/benchmarks/AutoClusterBenchmark/Program.cs b/benchmarks/AutoClusterBenchmark/Program.cs index ad1086342f..4683571668 100644 --- a/benchmarks/AutoClusterBenchmark/Program.cs +++ b/benchmarks/AutoClusterBenchmark/Program.cs @@ -29,15 +29,16 @@ public static class Program private static object Request = new HelloRequest(); public static async Task Main(string[] args) - { + { ThreadPool.SetMinThreads(0, 0); - foreach (var batchSize in new[] { 100, 150, 200, 250, 300 }) + + foreach (var batchSize in new[] {100, 150, 200, 250, 300}) { Configuration.ResetAgent(); ResetCounters(); - + var cluster = await Configuration.SpawnClient(); - + var elapsed = await RunWorkers(() => new RunMemberInProcGraceful(), () => RunBatchClient(batchSize, cluster)); var tps = requestCount / elapsed.TotalMilliseconds * 1000; Console.WriteLine(); @@ -47,7 +48,7 @@ public static async Task Main(string[] args) Console.WriteLine($"Failures:\t{failureCount:N0}"); Console.WriteLine($"Throughput:\t{tps:N0} requests/sec -> {(tps * 2):N0} msg/sec"); await cluster.ShutdownAsync(); - + await Task.Delay(5000); } } @@ -70,11 +71,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance try { - - var x = await cluster.RequestAsync(id, Request, context, cancellationToken); - - if (x != null) + try { + await cluster.RequestAsync(id, Request, context, cancellationToken); + var res = Interlocked.Increment(ref successCount); if (res % 10000 == 0) @@ -86,6 +86,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance return; } + catch (TimeoutException) + { + // ignored + } OnError(); } @@ -109,16 +113,16 @@ void OnError() private static void RunBatchClient(int batchSize, Cluster cluster) { var identities = new ClusterIdentity[actorCount]; + for (var i = 0; i < actorCount; i++) { var id = "myactor" + i; - identities[i] = ClusterIdentity.Create(id,"hello"); + identities[i] = ClusterIdentity.Create(id, "hello"); } - + var logger = Log.CreateLogger(nameof(Program)); _ = SafeTask.Run(() => { - var rnd = new Random(); var semaphore = new AsyncSemaphore(5); @@ -139,7 +143,8 @@ async Task RunBatch(Random? rnd, Cluster cluster) { var ct = CancellationTokens.FromSeconds(20); - var ctx = cluster.System.Root.CreateBatchContext(batchSize,ct); + var ctx = cluster.System.Root.CreateBatchContext(batchSize, ct); + for (var i = 0; i < batchSize; i++) { var id = identities[rnd!.Next(0, actorCount)]; diff --git a/benchmarks/ClusterBenchmark/Program.cs b/benchmarks/ClusterBenchmark/Program.cs index f20401898e..c73273ef72 100644 --- a/benchmarks/ClusterBenchmark/Program.cs +++ b/benchmarks/ClusterBenchmark/Program.cs @@ -39,13 +39,13 @@ public static async Task Main(string[] args) if (args.Length > 0) { // InteractiveOutput = args[0] == "1"; - + var l = typeof(Program).Assembly.Location; Console.WriteLine($"Worker running {l}"); var worker = await Configuration.SpawnMember(); AppDomain.CurrentDomain.ProcessExit += (sender, args) => { worker.ShutdownAsync().Wait(); }; Thread.Sleep(Timeout.Infinite); - + return; } @@ -154,7 +154,6 @@ public static async Task Main(string[] args) private static void RunNoopClient() { - } private static void RunFireForgetClient() @@ -166,6 +165,7 @@ private static void RunFireForgetClient() var cluster = await Configuration.SpawnClient(); // var rnd = new Random(); var i = 0; + while (true) { var id = "myactor" + (i++ % actorCount); @@ -186,10 +186,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance try { - var x = await cluster.RequestAsync(id, Request, context, cancellationToken); - - if (x != null) + try { + await cluster.RequestAsync(id, Request, context, cancellationToken); + var res = Interlocked.Increment(ref successCount); if (res % 10000 == 0) @@ -201,6 +201,10 @@ private static async Task SendRequest(Cluster cluster, ClusterIdentity id, Cance return; } + catch (TimeoutException) + { + // ignored + } OnError(); } @@ -218,7 +222,7 @@ void OnError() Console.ResetColor(); } } - + private static async Task SendRequest(Cluster cluster, string id, CancellationToken cancellationToken, ISenderContext? context = null) { Interlocked.Increment(ref requestCount); @@ -230,10 +234,10 @@ private static async Task SendRequest(Cluster cluster, string id, Cancella try { - var x = await cluster.RequestAsync(id, "hello", Request, context, cancellationToken); - - if (x != null) + try { + await cluster.RequestAsync(id, "hello", Request, context, cancellationToken); + var res = Interlocked.Increment(ref successCount); if (res % 10000 == 0) @@ -245,6 +249,10 @@ private static async Task SendRequest(Cluster cluster, string id, Cancella return true; } + catch (TimeoutException) + { + // ignored + } OnError(); } @@ -268,12 +276,13 @@ void OnError() private static void RunBatchClient(int batchSize) { var identities = new ClusterIdentity[actorCount]; + for (var i = 0; i < actorCount; i++) { var id = "myactor" + i; - identities[i] = ClusterIdentity.Create(id,"hello"); + identities[i] = ClusterIdentity.Create(id, "hello"); } - + var logger = Log.CreateLogger(nameof(Program)); _ = SafeTask.Run(async () => { @@ -281,6 +290,7 @@ private static void RunBatchClient(int batchSize) // var rnd = new Random(); var semaphore = new AsyncSemaphore(5); var i = 0; + while (true) { var b = i; @@ -298,7 +308,8 @@ async Task RunBatch(int startIndex, Cluster cluster) { var ct = CancellationTokens.FromSeconds(20); - var ctx = cluster.System.Root.CreateBatchContext(batchSize,ct); + var ctx = cluster.System.Root.CreateBatchContext(batchSize, ct); + for (var i = 0; i < batchSize; i++) { var id = identities[(startIndex + i) % identities.Length]; @@ -315,7 +326,7 @@ async Task RunBatch(int startIndex, Cluster cluster) } } } - + private static void RunDebugClient() { var logger = Log.CreateLogger(nameof(Program)); @@ -332,7 +343,7 @@ private static void RunDebugClient() if (!res) { - var pid = await cluster.GetAsync(ClusterIdentity.Create(id,"hello"),CancellationTokens.FromSeconds(10)); + var pid = await cluster.GetAsync(ClusterIdentity.Create(id, "hello"), CancellationTokens.FromSeconds(10)); if (pid != null) { diff --git a/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs b/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs index 148f91512d..ca3a72754d 100644 --- a/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs +++ b/benchmarks/ClusterMicroBenchmarks/ConcurrentSpawnBenchmark.cs @@ -96,7 +96,19 @@ public void RemoveActivations() tasks[i] = _cluster.RequestAsync(id, PoisonPill.Instance, cts.Token); } - Task.WhenAll(tasks).GetAwaiter().GetResult(); + try + { + Task.WhenAll(tasks).GetAwaiter().GetResult(); + } + catch (TimeoutException) + { + // ignore + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } } public enum IdentityLookup diff --git a/src/Proto.Cluster.CodeGen/Template.cs b/src/Proto.Cluster.CodeGen/Template.cs index caa8e3760e..e4889c6bc7 100644 --- a/src/Proto.Cluster.CodeGen/Template.cs +++ b/src/Proto.Cluster.CodeGen/Template.cs @@ -95,7 +95,7 @@ public class {{Name}}Client GrainResponseMessage grainResponse => {{#if UseReturn}}({{OutputName}}?)grainResponse.ResponseMessage{{else}}Nothing.Instance{{/if}}, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($""Unknown response type {res.GetType().FullName}"") @@ -116,7 +116,7 @@ public class {{Name}}Client GrainResponseMessage grainResponse => {{#if UseReturn}}({{OutputName}}?)grainResponse.ResponseMessage{{else}}Nothing.Instance{{/if}}, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($""Unknown response type {res.GetType().FullName}"") diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs index 39293072c0..09f3bb933c 100644 --- a/src/Proto.Cluster/ClusterConfig.cs +++ b/src/Proto.Cluster/ClusterConfig.cs @@ -156,6 +156,12 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde /// public PubSubConfig PubSubConfig { get; init; } = PubSubConfig.Setup(); + /// + /// Backwards compatibility. Set to true to have timed out requests return default(TResponse) instead of throwing + /// Default is false. + /// + public bool LegacyRequestTimeoutBehavior { get; init; } + /// /// Timeout for spawning an actor in the Partition Identity Lookup. Default is 5s. /// @@ -326,6 +332,13 @@ public ClusterConfig WithHeartbeatExpiration(TimeSpan expiration) => public ClusterConfig WithPubSubConfig(PubSubConfig config) => this with {PubSubConfig = config}; + /// + /// Backwards compatibility. Set to true to have timed out requests return default(TResponse) instead of throwing + /// Default is false. + /// + public ClusterConfig WithLegacyRequestTimeoutBehavior(bool enabled = true) => + this with {LegacyRequestTimeoutBehavior = enabled}; + /// /// Creates a new /// diff --git a/src/Proto.Cluster/DefaultClusterContext.cs b/src/Proto.Cluster/DefaultClusterContext.cs index b97058b232..2dd9d5dfca 100644 --- a/src/Proto.Cluster/DefaultClusterContext.cs +++ b/src/Proto.Cluster/DefaultClusterContext.cs @@ -27,6 +27,7 @@ public class DefaultClusterContext : IClusterContext private readonly ActorSystem _system; private static readonly ILogger Logger = Log.CreateLogger(); private readonly int _requestTimeoutSeconds; + private readonly bool _legacyTimeouts; public DefaultClusterContext(Cluster cluster) { @@ -41,6 +42,7 @@ public DefaultClusterContext(Cluster cluster) i => Logger.LogInformation("Throttled {LogCount} TryRequestAsync logs", i) ); _requestTimeoutSeconds = (int) config.ActorRequestTimeout.TotalSeconds; + _legacyTimeouts = config.LegacyRequestTimeoutBehavior; #if !NET6_0_OR_GREATER var updateInterval = TimeSpan.FromMilliseconds(Math.Min(config.ActorRequestTimeout.TotalMilliseconds / 2, 1000)); _clock = new TaskClock(config.ActorRequestTimeout, updateInterval, cluster.System.Shutdown); @@ -110,15 +112,14 @@ public DefaultClusterContext(Cluster cluster) return t1; } - if (typeof(T) == typeof(MessageEnvelope)) + if (untypedResult == null) // timeout, actual valid response cannot be null { - return (T) (object) MessageEnvelope.Wrap(task.Result); + return TimeoutOrThrow(); } - if (untypedResult == null) + if (typeof(T) == typeof(MessageEnvelope)) { - //null = timeout - return default; + return (T) (object) MessageEnvelope.Wrap(task.Result); } if (untypedResult is DeadLetterResponse) @@ -199,7 +200,7 @@ public DefaultClusterContext(Cluster cluster) Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}", clusterIdentity); } - return default!; + return TimeoutOrThrow(); } finally { @@ -212,6 +213,17 @@ void RefreshFuture() future = context.GetFuture(); lastPid = null; } + + T? TimeoutOrThrow() + { + if (_legacyTimeouts) + { + //null = timeout + return default; + } + + throw new TimeoutException("Request timed out"); + } } private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSource source, PID pid) diff --git a/src/Proto.Cluster/IClusterProvider.cs b/src/Proto.Cluster/IClusterProvider.cs index b60ff4626f..a78bcfd1c3 100644 --- a/src/Proto.Cluster/IClusterProvider.cs +++ b/src/Proto.Cluster/IClusterProvider.cs @@ -23,7 +23,8 @@ public interface IClusterProvider Task StartMemberAsync(Cluster cluster); /// - /// Starts the cluster provider in client mode. The client member does not support any kinds. + /// Starts the cluster provider in client mode. The client member does not host any virtual actors and it is not registered in the membership provider. + /// It only monitors other member's presence and allows to send messages to virtual actors hosted by other members. /// /// /// diff --git a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs index eea3ee2a5e..3025d17033 100644 --- a/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs +++ b/tests/Proto.Cluster.CodeGen.Tests/ExpectedOutput.cs @@ -111,7 +111,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => (Acme.Mysystem.Bar.GetCurrentStateResponse?)grainResponse.ResponseMessage, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") @@ -132,7 +132,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => (Acme.Mysystem.Bar.GetCurrentStateResponse?)grainResponse.ResponseMessage, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") @@ -152,7 +152,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => Nothing.Instance, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") @@ -173,7 +173,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => Nothing.Instance, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") @@ -193,7 +193,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => (Acme.Mysystem.Bar.Response?)grainResponse.ResponseMessage, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") @@ -214,7 +214,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => (Acme.Mysystem.Bar.Response?)grainResponse.ResponseMessage, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") @@ -234,7 +234,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => Nothing.Instance, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") @@ -255,7 +255,7 @@ public TestGrainClient(Cluster cluster, string id) GrainResponseMessage grainResponse => Nothing.Instance, // error response GrainErrorResponse grainErrorResponse => throw new Exception(grainErrorResponse.Err), - //timeout + // timeout (when enabled by ClusterConfig.LegacyRequestTimeoutBehavior), othwerwise TimeoutException is thrown null => null, // unsupported response _ => throw new NotSupportedException($"Unknown response type {res.GetType().FullName}") diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index 998ce0e3e9..516f002143 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -39,7 +39,7 @@ public async Task TopologiesShouldHaveConsensus() .WaitUpTo(TimeSpan.FromSeconds(20)).ConfigureAwait(false); _testOutputHelper.WriteLine(await Members.DumpClusterState()); - + consensus.completed.Should().BeTrue("All members should have gotten consensus on the same topology hash"); _testOutputHelper.WriteLine(LogStore.ToFormattedString()); } @@ -346,15 +346,21 @@ private async Task PingPong( { await Task.Yield(); - var response = await cluster.Ping(id, id, CancellationTokens.FromSeconds(4), kind); - var tries = 1; + Pong response = null; - while (response == null && !token.IsCancellationRequested) + do { - await Task.Delay(200, token); - //_testOutputHelper.WriteLine($"Retrying ping {kind}/{id}, attempt {++tries}"); - response = await cluster.Ping(id, id, CancellationTokens.FromSeconds(4), kind); - } + try + { + response = await cluster.Ping(id, id, CancellationTokens.FromSeconds(4), kind); + } + catch (TimeoutException) + { + // expected + } + + if (response == null) await Task.Delay(200, token); + } while (response == null && !token.IsCancellationRequested); response.Should().NotBeNull($"We expect a response before timeout on {kind}/{id}"); diff --git a/tests/Proto.Cluster.Tests/LegacyTimeoutTests.cs b/tests/Proto.Cluster.Tests/LegacyTimeoutTests.cs new file mode 100644 index 0000000000..441f671452 --- /dev/null +++ b/tests/Proto.Cluster.Tests/LegacyTimeoutTests.cs @@ -0,0 +1,43 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using ClusterTest.Messages; +using FluentAssertions; +using Xunit; + +namespace Proto.Cluster.Tests; + +public class LegacyTimeoutTests +{ + [Fact] + public async Task ReturnsNullOnRequestTimeoutInLegacyMode() + { + await using var fixture = new Fixture(1); + await fixture.InitializeAsync(); + + var response = await fixture.Members.First().RequestAsync(CreateIdentity("slow-test"), EchoActor.Kind, + new SlowPing {Message = "hi", DelayMs = 4000}, new CancellationTokenSource(500).Token + ); + + response.Should().BeNull(); + } + + private string CreateIdentity(string baseId) => $"{Guid.NewGuid().ToString("N").Substring(0, 6)}-{baseId}-"; + + private class Fixture : BaseInMemoryClusterFixture + { + public Fixture(int clusterSize) + : base(clusterSize, cc => cc + .WithActorRequestTimeout(TimeSpan.FromSeconds(1)) + .WithLegacyRequestTimeoutBehavior() + ) + { + } + } +} \ No newline at end of file diff --git a/tests/Proto.Cluster.Tests/TimeoutTests.cs b/tests/Proto.Cluster.Tests/TimeoutTests.cs new file mode 100644 index 0000000000..2bbb22edf1 --- /dev/null +++ b/tests/Proto.Cluster.Tests/TimeoutTests.cs @@ -0,0 +1,42 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using ClusterTest.Messages; +using FluentAssertions; +using Xunit; + +namespace Proto.Cluster.Tests; + +public class TimeoutTests +{ + [Fact] + public async Task ThrowsTimeoutExceptionOnRequestTimeout() + { + await using var fixture = new Fixture(1); + await fixture.InitializeAsync(); + + await fixture.Members.First() + .Invoking(m => m.RequestAsync(CreateIdentity("slow-test"), EchoActor.Kind, + new SlowPing {Message = "hi", DelayMs = 4000}, new CancellationTokenSource(500).Token + ) + ).Should().ThrowAsync(); + } + + private string CreateIdentity(string baseId) => $"{Guid.NewGuid().ToString("N").Substring(0, 6)}-{baseId}-"; + + private class Fixture : BaseInMemoryClusterFixture + { + public Fixture(int clusterSize) + : base(clusterSize, cc => cc + .WithActorRequestTimeout(TimeSpan.FromSeconds(1)) + ) + { + } + } +} \ No newline at end of file