diff --git a/benchmarks/AutoClusterBenchmark/Configuration.cs b/benchmarks/AutoClusterBenchmark/Configuration.cs index 25d4a003e5..09a230a538 100644 --- a/benchmarks/AutoClusterBenchmark/Configuration.cs +++ b/benchmarks/AutoClusterBenchmark/Configuration.cs @@ -37,7 +37,7 @@ IIdentityLookup identityLookup var helloProps = Props.FromProducer(() => new WorkerActor()); return ClusterConfig .Setup("mycluster", clusterProvider, identityLookup) - .WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster)) + .WithClusterContextProducer(cluster => new DefaultClusterContext(cluster)) .WithClusterKind("hello", helloProps) .WithGossipFanOut(3); } @@ -119,6 +119,7 @@ public static async Task SpawnMember() private static ActorSystemConfig GetMemberActorSystemConfig() { var config = new ActorSystemConfig() + .WithSharedFutures() .WithDeadLetterThrottleCount(3) .WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1)) .WithDeadLetterRequestLogging(false); diff --git a/benchmarks/ClusterBenchmark/Configuration.cs b/benchmarks/ClusterBenchmark/Configuration.cs index 0710f7f18b..38e6cbf13e 100644 --- a/benchmarks/ClusterBenchmark/Configuration.cs +++ b/benchmarks/ClusterBenchmark/Configuration.cs @@ -69,7 +69,7 @@ IIdentityLookup identityLookup var helloProps = Props.FromProducer(() => new WorkerActor()); return ClusterConfig .Setup("mycluster", clusterProvider, identityLookup) - .WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster)) + .WithClusterContextProducer(cluster => new DefaultClusterContext(cluster)) .WithClusterKind("hello", helloProps) .WithGossipFanOut(3); } @@ -182,7 +182,7 @@ public static async Task SpawnMember() private static ActorSystemConfig GetMemberActorSystemConfig() { var config = new ActorSystemConfig() - // .WithSharedFutures() + .WithSharedFutures() .WithDeadLetterThrottleCount(3) .WithDeadLetterThrottleInterval(TimeSpan.FromSeconds(1)) .WithDeadLetterRequestLogging(false); diff --git a/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs b/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs index 0a6d7b64b3..2f66e5650b 100644 --- a/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs +++ b/benchmarks/ClusterMicroBenchmarks/InProcessClusterBatchRequestBenchmark.cs @@ -73,7 +73,7 @@ private ClusterConfig ClusterConfig() if (ExperimentalContext) { - config = config.WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster)); + config = config.WithClusterContextProducer(cluster => new DefaultClusterContext(cluster)); } return config; diff --git a/src/Proto.Actor/Configuration/ActorSystemConfig.cs b/src/Proto.Actor/Configuration/ActorSystemConfig.cs index 8106f5b698..ed0b5fbe52 100644 --- a/src/Proto.Actor/Configuration/ActorSystemConfig.cs +++ b/src/Proto.Actor/Configuration/ActorSystemConfig.cs @@ -74,7 +74,7 @@ public record ActorSystemConfig /// SharedFutures allows the ActorSystem to avoid registering a new temporary process for each request /// Instead registering a SharedFuture that can handle multiple requests internally /// - public bool SharedFutures { get; init; } + public bool SharedFutures { get; init; } = true; /// /// Sets the number of requests that can be handled by a SharedFuture diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs index 76548a7a1e..32b0cd4a5e 100644 --- a/src/Proto.Cluster/ClusterConfig.cs +++ b/src/Proto.Cluster/ClusterConfig.cs @@ -63,8 +63,7 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde public TimeSpan RemotePidCacheTimeToLive { get; set; } public TimeSpan RemotePidCacheClearInterval { get; set; } - public Func ClusterContextProducer { get; init; } = - c => new DefaultClusterContext(c.System, c.IdentityLookup, c.PidCache, c.Config.ToClusterContextConfig(), c.System.Shutdown); + public Func ClusterContextProducer { get; init; } = c => new DefaultClusterContext(c); public TimeSpan ActorRequestRetryInterval { get; init; } public ClusterConfig WithTimeout(TimeSpan timeSpan) => diff --git a/src/Proto.Cluster/DefaultClusterContext.cs b/src/Proto.Cluster/DefaultClusterContext.cs index 52d3a7a159..1019292adf 100644 --- a/src/Proto.Cluster/DefaultClusterContext.cs +++ b/src/Proto.Cluster/DefaultClusterContext.cs @@ -1,5 +1,5 @@ // ----------------------------------------------------------------------- -// +// // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- @@ -10,7 +10,6 @@ using Microsoft.Extensions.Logging; using Proto.Cluster.Identity; using Proto.Cluster.Metrics; -using Proto.Future; using Proto.Utils; namespace Proto.Cluster; @@ -23,35 +22,26 @@ public class DefaultClusterContext : IClusterContext private readonly ShouldThrottle _requestLogThrottle; private readonly TaskClock _clock; private readonly ActorSystem _system; - private static readonly ILogger Logger = Log.CreateLogger(); - public DefaultClusterContext( - ActorSystem system, - IIdentityLookup identityLookup, - PidCache pidCache, - ClusterContextConfig config, - CancellationToken killSwitch - ) + public DefaultClusterContext(Cluster cluster) { - _identityLookup = identityLookup; - _pidCache = pidCache; - _system = system; + _identityLookup = cluster.IdentityLookup; + _pidCache = cluster.PidCache; + var config = cluster.Config; + _system = cluster.System; _requestLogThrottle = Throttle.Create( config.MaxNumberOfEventsInRequestLogThrottlePeriod, config.RequestLogThrottlePeriod, i => Logger.LogInformation("Throttled {LogCount} TryRequestAsync logs", i) ); - - _clock = new TaskClock(config.ActorRequestTimeout, config.ActorRequestRetryInterval, killSwitch); + _clock = new TaskClock(config.ActorRequestTimeout, TimeSpan.FromSeconds(1), cluster.System.Shutdown); _clock.Start(); } public async Task RequestAsync(ClusterIdentity clusterIdentity, object message, ISenderContext context, CancellationToken ct) { - var start = Stopwatch.StartNew(); - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} Message {Message}", clusterIdentity, message); var i = 0; var future = context.GetFuture(); @@ -59,12 +49,17 @@ CancellationToken killSwitch try { - while (!ct.IsCancellationRequested) + while (!ct.IsCancellationRequested && !context.System.Shutdown.IsCancellationRequested) { - if (context.System.Shutdown.IsCancellationRequested) return default; + i++; var source = PidSource.Cache; - var pid = GetCachedPid(clusterIdentity); + var pid = clusterIdentity.CachedPid; + + if (pid == null) + { + pid = GetCachedPid(clusterIdentity); + } if (pid is null) { @@ -72,42 +67,86 @@ CancellationToken killSwitch pid = await GetPidFromLookup(clusterIdentity, context, ct); } - if (context.System.Shutdown.IsCancellationRequested) return default; - if (pid is null) { if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} - Did not get PID from IdentityLookup", clusterIdentity); - await Task.Delay(++i * 20, CancellationToken.None); + await Task.Delay(i * 20, CancellationToken.None); continue; } // Ensures that a future is not re-used against another actor. if (lastPid is not null && !pid.Equals(lastPid)) RefreshFuture(); - // Logger.LogDebug("Requesting {ClusterIdentity} - Got PID {Pid} from {Source}", clusterIdentity, pid, source); - var (status, res) = await TryRequestAsync(clusterIdentity, message, pid, source, context, future); + Stopwatch t = null!; - switch (status) + if (context.System.Metrics.Enabled) + { + t=Stopwatch.StartNew(); + } + + try + { + context.Request(pid, message, future.Pid); + var task = future.Task; + + await Task.WhenAny(task, _clock.CurrentBucket); + + if (task.IsCompleted) + { + var (status, result) = ToResult(source, context, task.Result); + + switch (status) + { + case ResponseStatus.Ok: return result; + case ResponseStatus.InvalidResponse: + RefreshFuture(); + await RemoveFromSource(clusterIdentity, source, pid); + break; + case ResponseStatus.DeadLetter: + RefreshFuture(); + await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid); + break; + } + } + else + { + if (!context.System.Shutdown.IsCancellationRequested) + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source); + _pidCache.RemoveByVal(clusterIdentity, pid); + } + } + catch (TimeoutException) + { + lastPid = pid; + await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); + continue; + } + catch (Exception x) { - case ResponseStatus.Ok: - return res; - - case ResponseStatus.Exception: - RefreshFuture(); - await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); - await Task.Delay(++i * 20, CancellationToken.None); - break; - case ResponseStatus.DeadLetter: - RefreshFuture(); - await RemoveFromSource(clusterIdentity, source, pid); - break; - case ResponseStatus.TimedOut: - lastPid = pid; - await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); - break; + x.CheckFailFast(); + if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source); + _pidCache.RemoveByVal(clusterIdentity, pid); + RefreshFuture(); + await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); + await Task.Delay(i * 20, CancellationToken.None); + continue; + } + finally + { + if (context.System.Metrics.Enabled) + { + var elapsed = t.Elapsed; + ClusterMetrics.ClusterRequestDuration + .Record(elapsed.TotalSeconds, + new("id", _system.Id), new("address", _system.Address), + new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name), + new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup") + ); + } } - if (_system.Metrics.Enabled) + if (context.System.Metrics.Enabled) { ClusterMetrics.ClusterRequestRetryCount.Add( 1, new("id", _system.Id), new("address", _system.Address), @@ -118,8 +157,7 @@ CancellationToken killSwitch if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) { - var t = start.Elapsed; - Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}, elapsed {Time}", clusterIdentity, t); + Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}", clusterIdentity); } return default!; @@ -144,6 +182,9 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou _pidCache.RemoveByVal(clusterIdentity, pid); } + + private PID? GetCachedPid(ClusterIdentity clusterIdentity) => _pidCache.TryGet(clusterIdentity, out var pid) ? pid : null; + private async Task GetPidFromLookup(ClusterIdentity clusterIdentity, ISenderContext context, CancellationToken ct) { try @@ -177,81 +218,9 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou } } - private async ValueTask<(ResponseStatus Status, T?)> TryRequestAsync( - ClusterIdentity clusterIdentity, - object message, - PID pid, - PidSource source, - ISenderContext context, - IFuture future - ) - { - var t = Stopwatch.StartNew(); - - try - { - context.Request(pid, message, future.Pid); - var task = future.Task; - - await Task.WhenAny(task, _clock.CurrentBucket); - - if (task.IsCompleted) - { - var res = task.Result; - - return ToResult(source, context, res); - } - - if (!context.System.Shutdown.IsCancellationRequested) - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source); - - _pidCache.RemoveByVal(clusterIdentity, pid); - - return (ResponseStatus.TimedOut, default); - } - catch (TimeoutException) - { - return (ResponseStatus.TimedOut, default); - } - catch (Exception x) - { - x.CheckFailFast(); - if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source); - _pidCache.RemoveByVal(clusterIdentity, pid); - return (ResponseStatus.Exception, default); - } - finally - { - if (context.System.Metrics.Enabled) - { - var elapsed = t.Elapsed; - ClusterMetrics.ClusterRequestDuration - .Record(elapsed.TotalSeconds, - new("id", _system.Id), new("address", _system.Address), - new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name), - new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup") - ); - } - } - } - - private PID? GetCachedPid(ClusterIdentity clusterIdentity) - { - var pid = clusterIdentity.CachedPid; - - if (pid is null && _pidCache.TryGet(clusterIdentity, out pid)) - { - clusterIdentity.CachedPid = pid; - } - - return pid; - } - - private static (ResponseStatus Status, T?) ToResult(PidSource source, ISenderContext context, object result) + private static (ResponseStatus Ok, T?) ToResult(PidSource source, ISenderContext context, object result) { var message = MessageEnvelope.UnwrapMessage(result); - switch (message) { case DeadLetterResponse: @@ -266,16 +235,15 @@ private static (ResponseStatus Status, T?) ToResult(PidSource source, ISender { return (ResponseStatus.Ok, (T) (object) MessageEnvelope.Wrap(result)); } - Logger.LogWarning("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T)); - return (ResponseStatus.Exception, default); + Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T)); + return (ResponseStatus.InvalidResponse, default); } } private enum ResponseStatus { Ok, - TimedOut, - Exception, + InvalidResponse, DeadLetter } diff --git a/src/Proto.Cluster/ExperimentalClusterContext.cs b/src/Proto.Cluster/LegacyClusterContext.cs similarity index 53% rename from src/Proto.Cluster/ExperimentalClusterContext.cs rename to src/Proto.Cluster/LegacyClusterContext.cs index f12ac75f67..8c21de7ea6 100644 --- a/src/Proto.Cluster/ExperimentalClusterContext.cs +++ b/src/Proto.Cluster/LegacyClusterContext.cs @@ -1,5 +1,5 @@ // ----------------------------------------------------------------------- -// +// // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- @@ -10,11 +10,12 @@ using Microsoft.Extensions.Logging; using Proto.Cluster.Identity; using Proto.Cluster.Metrics; +using Proto.Future; using Proto.Utils; namespace Proto.Cluster; -public class ExperimentalClusterContext : IClusterContext +public class LegacyClusterContext : IClusterContext { private readonly IIdentityLookup _identityLookup; @@ -22,27 +23,35 @@ public class ExperimentalClusterContext : IClusterContext private readonly ShouldThrottle _requestLogThrottle; private readonly TaskClock _clock; private readonly ActorSystem _system; - private static readonly ILogger Logger = Log.CreateLogger(); - public ExperimentalClusterContext(Cluster cluster) + private static readonly ILogger Logger = Log.CreateLogger(); + + public LegacyClusterContext( + ActorSystem system, + IIdentityLookup identityLookup, + PidCache pidCache, + ClusterContextConfig config, + CancellationToken killSwitch + ) { - _identityLookup = cluster.IdentityLookup; - _pidCache = cluster.PidCache; - var config = cluster.Config; - _system = cluster.System; + _identityLookup = identityLookup; + _pidCache = pidCache; + _system = system; _requestLogThrottle = Throttle.Create( config.MaxNumberOfEventsInRequestLogThrottlePeriod, config.RequestLogThrottlePeriod, i => Logger.LogInformation("Throttled {LogCount} TryRequestAsync logs", i) ); - _clock = new TaskClock(config.ActorRequestTimeout, TimeSpan.FromSeconds(1), cluster.System.Shutdown); + + _clock = new TaskClock(config.ActorRequestTimeout, config.ActorRequestRetryInterval, killSwitch); _clock.Start(); } public async Task RequestAsync(ClusterIdentity clusterIdentity, object message, ISenderContext context, CancellationToken ct) { var start = Stopwatch.StartNew(); + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} Message {Message}", clusterIdentity, message); var i = 0; var future = context.GetFuture(); @@ -54,8 +63,6 @@ public ExperimentalClusterContext(Cluster cluster) { if (context.System.Shutdown.IsCancellationRequested) return default; - i++; - var source = PidSource.Cache; var pid = GetCachedPid(clusterIdentity); @@ -70,78 +77,37 @@ public ExperimentalClusterContext(Cluster cluster) if (pid is null) { if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Requesting {ClusterIdentity} - Did not get PID from IdentityLookup", clusterIdentity); - await Task.Delay(i * 20, CancellationToken.None); + await Task.Delay(++i * 20, CancellationToken.None); continue; } // Ensures that a future is not re-used against another actor. if (lastPid is not null && !pid.Equals(lastPid)) RefreshFuture(); - var t = Stopwatch.StartNew(); + // Logger.LogDebug("Requesting {ClusterIdentity} - Got PID {Pid} from {Source}", clusterIdentity, pid, source); + var (status, res) = await TryRequestAsync(clusterIdentity, message, pid, source, context, future); - try - { - context.Request(pid, message, future.Pid); - var task = future.Task; - - await Task.WhenAny(task, _clock.CurrentBucket); - - if (task.IsCompleted) - { - var (status, result) = ToResult(source, context, task.Result); - - switch (status) - { - case ResponseStatus.Ok: return result; - case ResponseStatus.InvalidResponse: - RefreshFuture(); - await RemoveFromSource(clusterIdentity, source, pid); - break; - case ResponseStatus.DeadLetter: - RefreshFuture(); - await RemoveFromSource(clusterIdentity, PidSource.Lookup, pid); - break; - } - } - else - { - if (!context.System.Shutdown.IsCancellationRequested) - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source); - _pidCache.RemoveByVal(clusterIdentity, pid); - } - } - catch (TimeoutException) - { - lastPid = pid; - await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); - continue; - } - catch (Exception x) - { - x.CheckFailFast(); - if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source); - _pidCache.RemoveByVal(clusterIdentity, pid); - RefreshFuture(); - await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); - await Task.Delay(i * 20, CancellationToken.None); - continue; - } - finally + switch (status) { - if (context.System.Metrics.Enabled) - { - var elapsed = t.Elapsed; - ClusterMetrics.ClusterRequestDuration - .Record(elapsed.TotalSeconds, - new("id", _system.Id), new("address", _system.Address), - new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name), - new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup") - ); - } + case ResponseStatus.Ok: + return res; + + case ResponseStatus.Exception: + RefreshFuture(); + await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); + await Task.Delay(++i * 20, CancellationToken.None); + break; + case ResponseStatus.DeadLetter: + RefreshFuture(); + await RemoveFromSource(clusterIdentity, source, pid); + break; + case ResponseStatus.TimedOut: + lastPid = pid; + await RemoveFromSource(clusterIdentity, PidSource.Cache, pid); + break; } - if (context.System.Metrics.Enabled) + if (_system.Metrics.Enabled) { ClusterMetrics.ClusterRequestRetryCount.Add( 1, new("id", _system.Id), new("address", _system.Address), @@ -152,7 +118,8 @@ public ExperimentalClusterContext(Cluster cluster) if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) { - Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}, elapsed {Time}", clusterIdentity, start.Elapsed); + var t = start.Elapsed; + Logger.LogWarning("RequestAsync retried but failed for {ClusterIdentity}, elapsed {Time}", clusterIdentity, t); } return default!; @@ -177,9 +144,6 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou _pidCache.RemoveByVal(clusterIdentity, pid); } - - private PID? GetCachedPid(ClusterIdentity clusterIdentity) => _pidCache.TryGet(clusterIdentity, out var pid) ? pid : null; - private async Task GetPidFromLookup(ClusterIdentity clusterIdentity, ISenderContext context, CancellationToken ct) { try @@ -213,9 +177,81 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou } } - private static (ResponseStatus Ok, T?) ToResult(PidSource source, ISenderContext context, object result) + private async ValueTask<(ResponseStatus Status, T?)> TryRequestAsync( + ClusterIdentity clusterIdentity, + object message, + PID pid, + PidSource source, + ISenderContext context, + IFuture future + ) + { + var t = Stopwatch.StartNew(); + + try + { + context.Request(pid, message, future.Pid); + var task = future.Task; + + await Task.WhenAny(task, _clock.CurrentBucket); + + if (task.IsCompleted) + { + var res = task.Result; + + return ToResult(source, context, res); + } + + if (!context.System.Shutdown.IsCancellationRequested) + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("TryRequestAsync timed out, PID from {Source}", source); + + _pidCache.RemoveByVal(clusterIdentity, pid); + + return (ResponseStatus.TimedOut, default); + } + catch (TimeoutException) + { + return (ResponseStatus.TimedOut, default); + } + catch (Exception x) + { + x.CheckFailFast(); + if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source); + _pidCache.RemoveByVal(clusterIdentity, pid); + return (ResponseStatus.Exception, default); + } + finally + { + if (context.System.Metrics.Enabled) + { + var elapsed = t.Elapsed; + ClusterMetrics.ClusterRequestDuration + .Record(elapsed.TotalSeconds, + new("id", _system.Id), new("address", _system.Address), + new("clusterkind", clusterIdentity.Kind), new("messagetype", message.GetType().Name), + new("pidsource", source == PidSource.Cache ? "PidCache" : "IIdentityLookup") + ); + } + } + } + + private PID? GetCachedPid(ClusterIdentity clusterIdentity) + { + var pid = clusterIdentity.CachedPid; + + if (pid is null && _pidCache.TryGet(clusterIdentity, out pid)) + { + clusterIdentity.CachedPid = pid; + } + + return pid; + } + + private static (ResponseStatus Status, T?) ToResult(PidSource source, ISenderContext context, object result) { var message = MessageEnvelope.UnwrapMessage(result); + switch (message) { case DeadLetterResponse: @@ -230,15 +266,16 @@ private static (ResponseStatus Ok, T?) ToResult(PidSource source, ISenderCont { return (ResponseStatus.Ok, (T) (object) MessageEnvelope.Wrap(result)); } - Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T)); - return (ResponseStatus.InvalidResponse, default); + Logger.LogWarning("Unexpected message. Was type {Type} but expected {ExpectedType}", message.GetType(), typeof(T)); + return (ResponseStatus.Exception, default); } } private enum ResponseStatus { Ok, - InvalidResponse, + TimedOut, + Exception, DeadLetter } diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index 61771e6aed..c94f6f679e 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -235,7 +235,7 @@ public class InMemoryClusterFixtureAlternativeClusterContext : BaseInMemoryClust { public InMemoryClusterFixtureAlternativeClusterContext() : base(3, config => config .WithActorRequestTimeout(TimeSpan.FromSeconds(4)) - .WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster)) + .WithClusterContextProducer(cluster => new DefaultClusterContext(cluster)) ) { } @@ -245,7 +245,7 @@ public class InMemoryClusterFixtureSharedFutures : BaseInMemoryClusterFixture { public InMemoryClusterFixtureSharedFutures() : base(3, config => config .WithActorRequestTimeout(TimeSpan.FromSeconds(4)) - .WithClusterContextProducer(cluster => new ExperimentalClusterContext(cluster)) + .WithClusterContextProducer(cluster => new DefaultClusterContext(cluster)) ) { } diff --git a/tests/Proto.Cluster.Tests/PidCacheTests.cs b/tests/Proto.Cluster.Tests/PidCacheTests.cs index c26611eaae..9a13652630 100644 --- a/tests/Proto.Cluster.Tests/PidCacheTests.cs +++ b/tests/Proto.Cluster.Tests/PidCacheTests.cs @@ -44,7 +44,7 @@ public async Task PurgesPidCacheOnNullResponse() var logger = Log.CreateLogger("dummylog"); var clusterIdentity = new ClusterIdentity {Identity = "identity", Kind = "kind"}; pidCache.TryAdd(clusterIdentity, deadPid); - var requestAsyncStrategy = new DefaultClusterContext(system, dummyIdentityLookup, pidCache, new ClusterContextConfig(), system.Shutdown); + var requestAsyncStrategy = new LegacyClusterContext(system, dummyIdentityLookup, pidCache, new ClusterContextConfig(), system.Shutdown); var res = await requestAsyncStrategy.RequestAsync(clusterIdentity, new Ping {Message = "msg"}, system.Root, new CancellationTokenSource(6000).Token