diff --git a/src/Proto.Actor/Context/ActorContext.cs b/src/Proto.Actor/Context/ActorContext.cs index 254d941546..2744d677ea 100644 --- a/src/Proto.Actor/Context/ActorContext.cs +++ b/src/Proto.Actor/Context/ActorContext.cs @@ -242,6 +242,8 @@ public Task Receive(MessageEnvelope envelope) public void EscalateFailure(Exception reason, object? message) { + reason.CheckFailFast(); + if (System.Config.DeveloperSupervisionLogging) { Console.WriteLine($"[Supervision] Actor {Self} : {Actor.GetType().Name} failed with message:{message} exception:{reason}"); diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index f0ae76041f..f6c744fb98 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -232,7 +232,8 @@ public void Publish(T msg) } catch (Exception ex) { - _logger.LogWarning(0, ex, "Exception has occurred when publishing a message."); + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); } return Task.CompletedTask; diff --git a/src/Proto.Actor/Extensions.cs b/src/Proto.Actor/Extensions.cs index 518bf29695..3ebe0feb7c 100644 --- a/src/Proto.Actor/Extensions.cs +++ b/src/Proto.Actor/Extensions.cs @@ -3,6 +3,7 @@ // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- +using System; using System.Collections.Generic; using JetBrains.Annotations; using Proto.Mailbox; @@ -31,4 +32,12 @@ out TValue value key = self.Key; value = self.Value; } + + public static void CheckFailFast(this Exception? reason) + { + if (reason is not OutOfMemoryException) return; + + Console.WriteLine("[Fatal] Out of memory exception" + reason); + Environment.FailFast(reason.Message, reason); + } } \ No newline at end of file diff --git a/src/Proto.Actor/Mailbox/BatchingMailbox.cs b/src/Proto.Actor/Mailbox/BatchingMailbox.cs index f11002440f..38d0afa414 100644 --- a/src/Proto.Actor/Mailbox/BatchingMailbox.cs +++ b/src/Proto.Actor/Mailbox/BatchingMailbox.cs @@ -91,6 +91,7 @@ private async Task RunAsync() } catch (Exception x) { + x.CheckFailFast(); _suspended = true; _invoker.EscalateFailure(x, currentMessage); } diff --git a/src/Proto.Actor/Mailbox/Mailbox.cs b/src/Proto.Actor/Mailbox/Mailbox.cs index dcfec6ff79..b3394ef912 100644 --- a/src/Proto.Actor/Mailbox/Mailbox.cs +++ b/src/Proto.Actor/Mailbox/Mailbox.cs @@ -236,7 +236,7 @@ private ValueTask ProcessMessages() if (msg is not null) { - var t= _invoker.InvokeUserMessageAsync(msg); + var t = _invoker.InvokeUserMessageAsync(msg); if (!t.IsCompletedSuccessfully) { @@ -254,6 +254,7 @@ private ValueTask ProcessMessages() } catch (Exception e) { + e.CheckFailFast(); _invoker.EscalateFailure(e, msg); } return default; @@ -263,6 +264,7 @@ static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self) try { await task; + foreach (var t1 in self._stats) { t1.MessageReceived(msg); @@ -270,6 +272,7 @@ static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self) } catch (Exception e) { + e.CheckFailFast(); self._invoker.EscalateFailure(e, msg); } } diff --git a/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs b/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs index f2045b6036..578e433e2c 100644 --- a/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs +++ b/src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs @@ -17,6 +17,5 @@ public void HandleFailure( RestartStatistics rs, Exception reason, object? message - ) - => supervisor.RestartChildren(reason, child); + ) => supervisor.RestartChildren(reason, child); } \ No newline at end of file diff --git a/src/Proto.Actor/Utils/Retry.cs b/src/Proto.Actor/Utils/Retry.cs index dc1bc0a375..e673e3f534 100644 --- a/src/Proto.Actor/Utils/Retry.cs +++ b/src/Proto.Actor/Utils/Retry.cs @@ -43,6 +43,7 @@ public static async Task TryUntil(Func> body, Func condit } catch (Exception x) { + x.CheckFailFast(); onError?.Invoke(i, x); if (i == retryCount - 1) @@ -105,6 +106,7 @@ public static async Task Try( } catch (Exception x) { + x.CheckFailFast(); onError?.Invoke(i, x); if (i == retryCount - 1) diff --git a/src/Proto.Actor/Utils/TaskFactory.cs b/src/Proto.Actor/Utils/TaskFactory.cs index 995c76ea08..7623a6c713 100644 --- a/src/Proto.Actor/Utils/TaskFactory.cs +++ b/src/Proto.Actor/Utils/TaskFactory.cs @@ -29,6 +29,7 @@ public static async Task Run(Func body, CancellationToken cancellationToke } catch (Exception x) { + x.CheckFailFast(); Logger.LogError(x, "Unhandled exception in async job {Job}", name); } } diff --git a/src/Proto.Cluster/DefaultClusterContext.cs b/src/Proto.Cluster/DefaultClusterContext.cs index 4c88d91e91..280e24182a 100644 --- a/src/Proto.Cluster/DefaultClusterContext.cs +++ b/src/Proto.Cluster/DefaultClusterContext.cs @@ -168,6 +168,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou } catch (Exception e) { + e.CheckFailFast(); if (context.System.Shutdown.IsCancellationRequested) return default; if (_requestLogThrottle().IsOpen()) @@ -213,6 +214,7 @@ IFuture future } catch (Exception x) { + x.CheckFailFast(); if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source); _pidCache.RemoveByVal(clusterIdentity, pid); diff --git a/src/Proto.Cluster/ExperimentalClusterContext.cs b/src/Proto.Cluster/ExperimentalClusterContext.cs index e6b1b0a275..dfcaebfd5f 100644 --- a/src/Proto.Cluster/ExperimentalClusterContext.cs +++ b/src/Proto.Cluster/ExperimentalClusterContext.cs @@ -118,6 +118,7 @@ public ExperimentalClusterContext(Cluster cluster) } catch (Exception x) { + x.CheckFailFast(); if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen()) if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source); _pidCache.RemoveByVal(clusterIdentity, pid); @@ -203,6 +204,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou } catch (Exception e) { + e.CheckFailFast(); if (context.System.Shutdown.IsCancellationRequested) return default; if (_requestLogThrottle().IsOpen()) diff --git a/src/Proto.Cluster/Gossip/GossipStateManagement.cs b/src/Proto.Cluster/Gossip/GossipStateManagement.cs index 468ec5c31f..c3fab5fa4b 100644 --- a/src/Proto.Cluster/Gossip/GossipStateManagement.cs +++ b/src/Proto.Cluster/Gossip/GossipStateManagement.cs @@ -129,53 +129,45 @@ Func extractValue { var logger = ctx?.Logger()?.BeginMethodScope(); - try + if (state.Members.Count == 0) { - if (state.Members.Count == 0) - { - logger?.LogDebug("No members found for consensus check"); - return (false, default); - } + logger?.LogDebug("No members found for consensus check"); + return (false, default); + } - logger?.LogDebug("Checking consensus"); + logger?.LogDebug("Checking consensus"); - if (!state.Members.TryGetValue(myId, out var ownMemberState)) - { - logger?.LogDebug("I can't find myself"); - return (false, default); - } + if (!state.Members.TryGetValue(myId, out var ownMemberState)) + { + logger?.LogDebug("I can't find myself"); + return (false, default); + } + + var ownValue = GetConsensusValue(ownMemberState); - var ownValue = GetConsensusValue(ownMemberState); + if (ownValue is null) + { + logger?.LogDebug("I don't have any value for {Key}", valueKey); + return (false, default); + } - if (ownValue is null) + foreach (var (memberId, memberState) in state.Members) + { + //skip blocked members + if (!members.Contains(memberId)) { - logger?.LogDebug("I don't have any value for {Key}", valueKey); - return (false, default); + logger?.LogDebug("Member is not part of cluster {MemberId}", memberId); + continue; } - foreach (var (memberId, memberState) in state.Members) - { - //skip blocked members - if (!members.Contains(memberId)) - { - logger?.LogDebug("Member is not part of cluster {MemberId}", memberId); - continue; - } + var consensusValue = GetConsensusValue(memberState); - var consensusValue = GetConsensusValue(memberState); + if (consensusValue is null || !ownValue.Equals(consensusValue)) return (false, default); + } - if (consensusValue is null || !ownValue.Equals(consensusValue)) return (false, default); - } + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Reached Consensus {Key}:{Value} - {State}", valueKey, ownValue, state); + return (true, ownValue); - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Reached Consensus {Key}:{Value} - {State}", valueKey, ownValue, state); - return (true, ownValue); - } - catch (Exception x) - { - logger?.LogError(x, "Check Consensus failed"); - Logger.LogError(x, "Check Consensus failed"); - return (false, default); - } TV? GetConsensusValue(GossipState.Types.GossipMemberState memberState) { diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs index fea9408506..33e7021d08 100644 --- a/src/Proto.Cluster/Gossip/Gossiper.cs +++ b/src/Proto.Cluster/Gossip/Gossiper.cs @@ -182,6 +182,7 @@ private async Task GossipLoop() } catch (Exception x) { + x.CheckFailFast(); Logger.LogError(x, "Gossip loop failed"); } } @@ -381,11 +382,9 @@ private async Task SendStateAsync() catch (OperationCanceledException) { } -#pragma warning disable RCS1075 - catch (Exception) -#pragma warning restore RCS1075 + catch (Exception x) { - //TODO: log + x.CheckFailFast(); } } diff --git a/src/Proto.Cluster/Member/MemberList.cs b/src/Proto.Cluster/Member/MemberList.cs index 6915b10fe9..a12c101db1 100644 --- a/src/Proto.Cluster/Member/MemberList.cs +++ b/src/Proto.Cluster/Member/MemberList.cs @@ -310,9 +310,10 @@ public void BroadcastEvent(object message, bool includeSelf = true) { _system.Root.Send(pid, message); } - catch (Exception) + catch (Exception x) { - Logger.LogError("[MemberList] Failed to broadcast {Message} to {Pid}", message, pid); + x.CheckFailFast(); + Logger.LogError(x, "[MemberList] Failed to broadcast {Message} to {Pid}", message, pid); } } } diff --git a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs index 93c15ba428..eadfc98583 100644 --- a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs +++ b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs @@ -616,6 +616,7 @@ TaskCompletionSource setResponse } catch (Exception x) { + x.CheckFailFast(); Logger.LogError(x, "[PartitionIdentity] Spawn failed"); _deltaTopology = null; // Do not use delta handover if we are not sure all spawns are OK. } diff --git a/src/Proto.Cluster/Partition/PartitionIdentityLookup.cs b/src/Proto.Cluster/Partition/PartitionIdentityLookup.cs index 713209495f..3692fbef52 100644 --- a/src/Proto.Cluster/Partition/PartitionIdentityLookup.cs +++ b/src/Proto.Cluster/Partition/PartitionIdentityLookup.cs @@ -109,6 +109,7 @@ public PartitionIdentityLookup(PartitionConfig? config) } catch (Exception e) { + e.CheckFailFast(); Logger.LogError(e, "[PartitionIdentity] Error occured requesting remote PID {@Request}, identity Owner {Owner}", req, identityOwner); return null; } diff --git a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs index 0f8a184dfc..9df345afbe 100644 --- a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs +++ b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs @@ -345,6 +345,7 @@ private Task OnActivationRequest(IContext context, ActivationRequest msg) } catch (Exception e) { + e.CheckFailFast(); Logger.LogError(e, "[PartitionIdentity] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity); var response = new ActivationResponse { diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs index f23d6f3bb0..0bb64cd140 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorLookup.cs @@ -66,6 +66,7 @@ public PartitionActivatorLookup(TimeSpan getPidTimeout) } catch (Exception e) { + e.CheckFailFast(); Logger.LogError(e, "[PartitionActivator] Error occured requesting remote PID {@Request}, identity Owner {Owner}", req, owner); return null; } diff --git a/src/Proto.Cluster/PubSub/BatchingProducer.cs b/src/Proto.Cluster/PubSub/BatchingProducer.cs index c73190dcfd..13a1c72200 100644 --- a/src/Proto.Cluster/PubSub/BatchingProducer.cs +++ b/src/Proto.Cluster/PubSub/BatchingProducer.cs @@ -102,6 +102,7 @@ private async Task PublisherLoop(CancellationToken cancel) } catch (Exception e) { + e.CheckFailFast(); if (_logThrottle().IsOpen()) Logger.LogError(e, "Error in the publisher loop of Producer for topic {Topic}", _topic); diff --git a/src/Proto.Cluster/Seed/SeedClientNodeActor.cs b/src/Proto.Cluster/Seed/SeedClientNodeActor.cs index ad37848c7a..bd9df2b5f7 100644 --- a/src/Proto.Cluster/Seed/SeedClientNodeActor.cs +++ b/src/Proto.Cluster/Seed/SeedClientNodeActor.cs @@ -48,6 +48,7 @@ private async Task OnConnect(IContext context) } catch (Exception x) { + x.CheckFailFast(); Logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port); } } @@ -78,6 +79,7 @@ private async Task OnClusterTopology(IContext context, ClusterTopology clusterTo } catch (Exception e) { + e.CheckFailFast(); Logger.LogError(e, "Failed to connect to seed node {Member}", member.Address); } } diff --git a/src/Proto.Cluster/Seed/SeedNodeActor.cs b/src/Proto.Cluster/Seed/SeedNodeActor.cs index 40ac5a97e5..cabb216c59 100644 --- a/src/Proto.Cluster/Seed/SeedNodeActor.cs +++ b/src/Proto.Cluster/Seed/SeedNodeActor.cs @@ -61,6 +61,7 @@ private async Task OnConnect(IContext context) } catch (Exception x) { + x.CheckFailFast(); Logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port); } } diff --git a/src/Proto.Remote/Endpoints/Endpoint.cs b/src/Proto.Remote/Endpoints/Endpoint.cs index 3f1e410842..470cda54ed 100644 --- a/src/Proto.Remote/Endpoints/Endpoint.cs +++ b/src/Proto.Remote/Endpoints/Endpoint.cs @@ -138,6 +138,7 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage) } catch (Exception ex) { + ex.CheckFailFast(); if (_logger.IsEnabled(_deserializationErrorLogLevel)) _logger.Log( _deserializationErrorLogLevel, @@ -312,6 +313,7 @@ private async Task RunAsync() } catch (Exception ex) { + ex.CheckFailFast(); _logger.LogError(ex, "[{SystemAddress}] Error in RunAsync", System.Address); } } diff --git a/src/Proto.Remote/Endpoints/EndpointReader.cs b/src/Proto.Remote/Endpoints/EndpointReader.cs index 4cb1774c95..864b91222d 100644 --- a/src/Proto.Remote/Endpoints/EndpointReader.cs +++ b/src/Proto.Remote/Endpoints/EndpointReader.cs @@ -51,8 +51,9 @@ await responseStream.WriteAsync(new RemoteMessage } ).ConfigureAwait(false); } - catch (Exception) + catch (Exception x) { + x.CheckFailFast(); Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address); } } @@ -148,6 +149,7 @@ await responseStream.WriteAsync(new RemoteMessage } catch (Exception e) { + e.CheckFailFast(); Logger.LogWarning(e, "[EndpointReader][{SystemAddress}] Writing error to {SystemId}", _system.Address, systemId); } } diff --git a/src/Proto.Remote/Endpoints/RemoteMessageHandler.cs b/src/Proto.Remote/Endpoints/RemoteMessageHandler.cs index 9f3067714a..c5b06c6927 100644 --- a/src/Proto.Remote/Endpoints/RemoteMessageHandler.cs +++ b/src/Proto.Remote/Endpoints/RemoteMessageHandler.cs @@ -85,6 +85,7 @@ public void HandleRemoteMessage(RemoteMessage currentMessage) } catch (Exception ex) { + ex.CheckFailFast(); if (_logger.IsEnabled(_deserializationErrorLogLevel)) _logger.Log( _deserializationErrorLogLevel, diff --git a/src/Proto.Remote/Endpoints/ServerConnector.cs b/src/Proto.Remote/Endpoints/ServerConnector.cs index 6e43a13eb1..34a664c37f 100644 --- a/src/Proto.Remote/Endpoints/ServerConnector.cs +++ b/src/Proto.Remote/Endpoints/ServerConnector.cs @@ -246,6 +246,7 @@ await call.RequestStream.WriteAsync(new RemoteMessage } catch (Exception e) { + e.CheckFailFast(); if (actorSystemId is not null && _system.Remote().BlockList.IsBlocked(actorSystemId)) { _logger.LogDebug("[ServerConnector][{SystemAddress}] dropped connection to blocked member {ActorSystemId}/{Address}", _system.Address, actorSystemId, _address);