Skip to content

Commit

Permalink
Make all catch Exception call CheckFailFast unless they rethrow (#1620)
Browse files Browse the repository at this point in the history
* make all catch Exception call CheckFailFast unless they rethrow
  • Loading branch information
rogeralsing committed May 27, 2022
1 parent 2e74b6a commit f47b751
Show file tree
Hide file tree
Showing 24 changed files with 75 additions and 47 deletions.
2 changes: 2 additions & 0 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ public Task Receive(MessageEnvelope envelope)

public void EscalateFailure(Exception reason, object? message)
{
reason.CheckFailFast();

if (System.Config.DeveloperSupervisionLogging)
{
Console.WriteLine($"[Supervision] Actor {Self} : {Actor.GetType().Name} failed with message:{message} exception:{reason}");
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public void Publish(T msg)
}
catch (Exception ex)
{
_logger.LogWarning(0, ex, "Exception has occurred when publishing a message.");
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}
return Task.CompletedTask;
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Actor/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using JetBrains.Annotations;
using Proto.Mailbox;
Expand Down Expand Up @@ -31,4 +32,12 @@ out TValue value
key = self.Key;
value = self.Value;
}

public static void CheckFailFast(this Exception? reason)
{
if (reason is not OutOfMemoryException) return;

Console.WriteLine("[Fatal] Out of memory exception" + reason);
Environment.FailFast(reason.Message, reason);
}
}
1 change: 1 addition & 0 deletions src/Proto.Actor/Mailbox/BatchingMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private async Task RunAsync()
}
catch (Exception x)
{
x.CheckFailFast();
_suspended = true;
_invoker.EscalateFailure(x, currentMessage);
}
Expand Down
5 changes: 4 additions & 1 deletion src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private ValueTask ProcessMessages()

if (msg is not null)
{
var t= _invoker.InvokeUserMessageAsync(msg);
var t = _invoker.InvokeUserMessageAsync(msg);

if (!t.IsCompletedSuccessfully)
{
Expand All @@ -254,6 +254,7 @@ private ValueTask ProcessMessages()
}
catch (Exception e)
{
e.CheckFailFast();
_invoker.EscalateFailure(e, msg);
}
return default;
Expand All @@ -263,13 +264,15 @@ static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self)
try
{
await task;

foreach (var t1 in self._stats)
{
t1.MessageReceived(msg);
}
}
catch (Exception e)
{
e.CheckFailFast();
self._invoker.EscalateFailure(e, msg);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ public void HandleFailure(
RestartStatistics rs,
Exception reason,
object? message
)
=> supervisor.RestartChildren(reason, child);
) => supervisor.RestartChildren(reason, child);
}
2 changes: 2 additions & 0 deletions src/Proto.Actor/Utils/Retry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static async Task<T> TryUntil<T>(Func<Task<T>> body, Func<T?,bool> condit
}
catch (Exception x)
{
x.CheckFailFast();
onError?.Invoke(i, x);

if (i == retryCount - 1)
Expand Down Expand Up @@ -105,6 +106,7 @@ public static async Task Try(
}
catch (Exception x)
{
x.CheckFailFast();
onError?.Invoke(i, x);

if (i == retryCount - 1)
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Actor/Utils/TaskFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static async Task Run(Func<Task> body, CancellationToken cancellationToke
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Unhandled exception in async job {Job}", name);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
}
catch (Exception e)
{
e.CheckFailFast();
if (context.System.Shutdown.IsCancellationRequested) return default;

if (_requestLogThrottle().IsOpen())
Expand Down Expand Up @@ -213,6 +214,7 @@ IFuture future
}
catch (Exception x)
{
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/ExperimentalClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public ExperimentalClusterContext(Cluster cluster)
}
catch (Exception x)
{
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
Expand Down Expand Up @@ -203,6 +204,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
}
catch (Exception e)
{
e.CheckFailFast();
if (context.System.Shutdown.IsCancellationRequested) return default;

if (_requestLogThrottle().IsOpen())
Expand Down
64 changes: 28 additions & 36 deletions src/Proto.Cluster/Gossip/GossipStateManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,53 +129,45 @@ Func<T, TV> extractValue
{
var logger = ctx?.Logger()?.BeginMethodScope();

try
if (state.Members.Count == 0)
{
if (state.Members.Count == 0)
{
logger?.LogDebug("No members found for consensus check");
return (false, default);
}
logger?.LogDebug("No members found for consensus check");
return (false, default);
}

logger?.LogDebug("Checking consensus");
logger?.LogDebug("Checking consensus");

if (!state.Members.TryGetValue(myId, out var ownMemberState))
{
logger?.LogDebug("I can't find myself");
return (false, default);
}
if (!state.Members.TryGetValue(myId, out var ownMemberState))
{
logger?.LogDebug("I can't find myself");
return (false, default);
}

var ownValue = GetConsensusValue(ownMemberState);

var ownValue = GetConsensusValue(ownMemberState);
if (ownValue is null)
{
logger?.LogDebug("I don't have any value for {Key}", valueKey);
return (false, default);
}

if (ownValue is null)
foreach (var (memberId, memberState) in state.Members)
{
//skip blocked members
if (!members.Contains(memberId))
{
logger?.LogDebug("I don't have any value for {Key}", valueKey);
return (false, default);
logger?.LogDebug("Member is not part of cluster {MemberId}", memberId);
continue;
}

foreach (var (memberId, memberState) in state.Members)
{
//skip blocked members
if (!members.Contains(memberId))
{
logger?.LogDebug("Member is not part of cluster {MemberId}", memberId);
continue;
}
var consensusValue = GetConsensusValue(memberState);

var consensusValue = GetConsensusValue(memberState);
if (consensusValue is null || !ownValue.Equals(consensusValue)) return (false, default);
}

if (consensusValue is null || !ownValue.Equals(consensusValue)) return (false, default);
}
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Reached Consensus {Key}:{Value} - {State}", valueKey, ownValue, state);
return (true, ownValue);

if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Reached Consensus {Key}:{Value} - {State}", valueKey, ownValue, state);
return (true, ownValue);
}
catch (Exception x)
{
logger?.LogError(x, "Check Consensus failed");
Logger.LogError(x, "Check Consensus failed");
return (false, default);
}

TV? GetConsensusValue(GossipState.Types.GossipMemberState memberState)
{
Expand Down
7 changes: 3 additions & 4 deletions src/Proto.Cluster/Gossip/Gossiper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private async Task GossipLoop()
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Gossip loop failed");
}
}
Expand Down Expand Up @@ -381,11 +382,9 @@ private async Task SendStateAsync()
catch (OperationCanceledException)
{
}
#pragma warning disable RCS1075
catch (Exception)
#pragma warning restore RCS1075
catch (Exception x)
{
//TODO: log
x.CheckFailFast();
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/Proto.Cluster/Member/MemberList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ public void BroadcastEvent(object message, bool includeSelf = true)
{
_system.Root.Send(pid, message);
}
catch (Exception)
catch (Exception x)
{
Logger.LogError("[MemberList] Failed to broadcast {Message} to {Pid}", message, pid);
x.CheckFailFast();
Logger.LogError(x, "[MemberList] Failed to broadcast {Message} to {Pid}", message, pid);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Partition/PartitionIdentityActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ TaskCompletionSource<ActivationResponse> setResponse
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "[PartitionIdentity] Spawn failed");
_deltaTopology = null; // Do not use delta handover if we are not sure all spawns are OK.
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Partition/PartitionIdentityLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public PartitionIdentityLookup(PartitionConfig? config)
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[PartitionIdentity] Error occured requesting remote PID {@Request}, identity Owner {Owner}", req, identityOwner);
return null;
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ private Task OnActivationRequest(IContext context, ActivationRequest msg)
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[PartitionIdentity] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity);
var response = new ActivationResponse
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public PartitionActivatorLookup(TimeSpan getPidTimeout)
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[PartitionActivator] Error occured requesting remote PID {@Request}, identity Owner {Owner}", req, owner);
return null;
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/PubSub/BatchingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private async Task PublisherLoop(CancellationToken cancel)
}
catch (Exception e)
{
e.CheckFailFast();
if (_logThrottle().IsOpen())
Logger.LogError(e, "Error in the publisher loop of Producer for topic {Topic}", _topic);

Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/Seed/SeedClientNodeActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private async Task OnConnect(IContext context)
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port);
}
}
Expand Down Expand Up @@ -78,6 +79,7 @@ private async Task OnClusterTopology(IContext context, ClusterTopology clusterTo
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "Failed to connect to seed node {Member}", member.Address);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Seed/SeedNodeActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private async Task OnConnect(IContext context)
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Remote/Endpoints/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage)
}
catch (Exception ex)
{
ex.CheckFailFast();
if (_logger.IsEnabled(_deserializationErrorLogLevel))
_logger.Log(
_deserializationErrorLogLevel,
Expand Down Expand Up @@ -312,6 +313,7 @@ private async Task RunAsync()
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(ex, "[{SystemAddress}] Error in RunAsync", System.Address);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ await responseStream.WriteAsync(new RemoteMessage
}
).ConfigureAwait(false);
}
catch (Exception)
catch (Exception x)
{
x.CheckFailFast();
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address);
}
}
Expand Down Expand Up @@ -148,6 +149,7 @@ await responseStream.WriteAsync(new RemoteMessage
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogWarning(e, "[EndpointReader][{SystemAddress}] Writing error to {SystemId}", _system.Address, systemId);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Remote/Endpoints/RemoteMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void HandleRemoteMessage(RemoteMessage currentMessage)
}
catch (Exception ex)
{
ex.CheckFailFast();
if (_logger.IsEnabled(_deserializationErrorLogLevel))
_logger.Log(
_deserializationErrorLogLevel,
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Remote/Endpoints/ServerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ await call.RequestStream.WriteAsync(new RemoteMessage
}
catch (Exception e)
{
e.CheckFailFast();
if (actorSystemId is not null && _system.Remote().BlockList.IsBlocked(actorSystemId))
{
_logger.LogDebug("[ServerConnector][{SystemAddress}] dropped connection to blocked member {ActorSystemId}/{Address}", _system.Address, actorSystemId, _address);
Expand Down

0 comments on commit f47b751

Please sign in to comment.