Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all catch Exception call CheckFailFast unless they rethrow #1620

Merged
merged 9 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ public Task Receive(MessageEnvelope envelope)

public void EscalateFailure(Exception reason, object? message)
{
reason.CheckFailFast();

if (System.Config.DeveloperSupervisionLogging)
{
Console.WriteLine($"[Supervision] Actor {Self} : {Actor.GetType().Name} failed with message:{message} exception:{reason}");
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public void Publish(T msg)
}
catch (Exception ex)
{
_logger.LogWarning(0, ex, "Exception has occurred when publishing a message.");
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}

return Task.CompletedTask;
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Actor/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using JetBrains.Annotations;
using Proto.Mailbox;
Expand Down Expand Up @@ -31,4 +32,12 @@ out TValue value
key = self.Key;
value = self.Value;
}

public static void CheckFailFast(this Exception? reason)
{
if (reason is not OutOfMemoryException) return;

Console.WriteLine("[Fatal] Out of memory exception" + reason);
Environment.FailFast(reason.Message, reason);
}
}
1 change: 1 addition & 0 deletions src/Proto.Actor/Mailbox/BatchingMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private async Task RunAsync()
}
catch (Exception x)
{
x.CheckFailFast();
_suspended = true;
_invoker.EscalateFailure(x, currentMessage);
}
Expand Down
5 changes: 4 additions & 1 deletion src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private ValueTask ProcessMessages()

if (msg is not null)
{
var t= _invoker.InvokeUserMessageAsync(msg);
var t = _invoker.InvokeUserMessageAsync(msg);

if (!t.IsCompletedSuccessfully)
{
Expand All @@ -254,6 +254,7 @@ private ValueTask ProcessMessages()
}
catch (Exception e)
{
e.CheckFailFast();
_invoker.EscalateFailure(e, msg);
}
return default;
Expand All @@ -263,13 +264,15 @@ static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self)
try
{
await task;

foreach (var t1 in self._stats)
{
t1.MessageReceived(msg);
}
}
catch (Exception e)
{
e.CheckFailFast();
self._invoker.EscalateFailure(e, msg);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/Proto.Actor/Supervision/AlwaysRestartStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ public void HandleFailure(
RestartStatistics rs,
Exception reason,
object? message
)
=> supervisor.RestartChildren(reason, child);
) => supervisor.RestartChildren(reason, child);
}
2 changes: 2 additions & 0 deletions src/Proto.Actor/Utils/Retry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static async Task<T> TryUntil<T>(Func<Task<T>> body, Func<T?,bool> condit
}
catch (Exception x)
{
x.CheckFailFast();
onError?.Invoke(i, x);

if (i == retryCount - 1)
Expand Down Expand Up @@ -105,6 +106,7 @@ public static async Task Try(
}
catch (Exception x)
{
x.CheckFailFast();
onError?.Invoke(i, x);

if (i == retryCount - 1)
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Actor/Utils/TaskFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static async Task Run(Func<Task> body, CancellationToken cancellationToke
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Unhandled exception in async job {Job}", name);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
}
catch (Exception e)
{
e.CheckFailFast();
if (context.System.Shutdown.IsCancellationRequested) return default;

if (_requestLogThrottle().IsOpen())
Expand Down Expand Up @@ -213,6 +214,7 @@ IFuture future
}
catch (Exception x)
{
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/ExperimentalClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public ExperimentalClusterContext(Cluster cluster)
}
catch (Exception x)
{
x.CheckFailFast();
if (!context.System.Shutdown.IsCancellationRequested && _requestLogThrottle().IsOpen())
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug(x, "TryRequestAsync failed with exception, PID from {Source}", source);
_pidCache.RemoveByVal(clusterIdentity, pid);
Expand Down Expand Up @@ -203,6 +204,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
}
catch (Exception e)
{
e.CheckFailFast();
if (context.System.Shutdown.IsCancellationRequested) return default;

if (_requestLogThrottle().IsOpen())
Expand Down
64 changes: 28 additions & 36 deletions src/Proto.Cluster/Gossip/GossipStateManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,53 +129,45 @@ Func<T, TV> extractValue
{
var logger = ctx?.Logger()?.BeginMethodScope();

try
if (state.Members.Count == 0)
{
if (state.Members.Count == 0)
{
logger?.LogDebug("No members found for consensus check");
return (false, default);
}
logger?.LogDebug("No members found for consensus check");
return (false, default);
}

logger?.LogDebug("Checking consensus");
logger?.LogDebug("Checking consensus");

if (!state.Members.TryGetValue(myId, out var ownMemberState))
{
logger?.LogDebug("I can't find myself");
return (false, default);
}
if (!state.Members.TryGetValue(myId, out var ownMemberState))
{
logger?.LogDebug("I can't find myself");
return (false, default);
}

var ownValue = GetConsensusValue(ownMemberState);

var ownValue = GetConsensusValue(ownMemberState);
if (ownValue is null)
{
logger?.LogDebug("I don't have any value for {Key}", valueKey);
return (false, default);
}

if (ownValue is null)
foreach (var (memberId, memberState) in state.Members)
{
//skip blocked members
if (!members.Contains(memberId))
{
logger?.LogDebug("I don't have any value for {Key}", valueKey);
return (false, default);
logger?.LogDebug("Member is not part of cluster {MemberId}", memberId);
continue;
}

foreach (var (memberId, memberState) in state.Members)
{
//skip blocked members
if (!members.Contains(memberId))
{
logger?.LogDebug("Member is not part of cluster {MemberId}", memberId);
continue;
}
var consensusValue = GetConsensusValue(memberState);

var consensusValue = GetConsensusValue(memberState);
if (consensusValue is null || !ownValue.Equals(consensusValue)) return (false, default);
}

if (consensusValue is null || !ownValue.Equals(consensusValue)) return (false, default);
}
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Reached Consensus {Key}:{Value} - {State}", valueKey, ownValue, state);
return (true, ownValue);

if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug("Reached Consensus {Key}:{Value} - {State}", valueKey, ownValue, state);
return (true, ownValue);
}
catch (Exception x)
{
logger?.LogError(x, "Check Consensus failed");
Logger.LogError(x, "Check Consensus failed");
return (false, default);
}

TV? GetConsensusValue(GossipState.Types.GossipMemberState memberState)
{
Expand Down
7 changes: 3 additions & 4 deletions src/Proto.Cluster/Gossip/Gossiper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private async Task GossipLoop()
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Gossip loop failed");
}
}
Expand Down Expand Up @@ -381,11 +382,9 @@ private async Task SendStateAsync()
catch (OperationCanceledException)
{
}
#pragma warning disable RCS1075
catch (Exception)
#pragma warning restore RCS1075
catch (Exception x)
{
//TODO: log
x.CheckFailFast();
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/Proto.Cluster/Member/MemberList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ public void BroadcastEvent(object message, bool includeSelf = true)
{
_system.Root.Send(pid, message);
}
catch (Exception)
catch (Exception x)
{
Logger.LogError("[MemberList] Failed to broadcast {Message} to {Pid}", message, pid);
x.CheckFailFast();
Logger.LogError(x, "[MemberList] Failed to broadcast {Message} to {Pid}", message, pid);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Partition/PartitionIdentityActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ TaskCompletionSource<ActivationResponse> setResponse
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "[PartitionIdentity] Spawn failed");
_deltaTopology = null; // Do not use delta handover if we are not sure all spawns are OK.
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Partition/PartitionIdentityLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public PartitionIdentityLookup(PartitionConfig? config)
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[PartitionIdentity] Error occured requesting remote PID {@Request}, identity Owner {Owner}", req, identityOwner);
return null;
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ private Task OnActivationRequest(IContext context, ActivationRequest msg)
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[PartitionIdentity] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity);
var response = new ActivationResponse
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public PartitionActivatorLookup(TimeSpan getPidTimeout)
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[PartitionActivator] Error occured requesting remote PID {@Request}, identity Owner {Owner}", req, owner);
return null;
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/PubSub/BatchingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private async Task PublisherLoop(CancellationToken cancel)
}
catch (Exception e)
{
e.CheckFailFast();
if (_logThrottle().IsOpen())
Logger.LogError(e, "Error in the publisher loop of Producer for topic {Topic}", _topic);

Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/Seed/SeedClientNodeActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private async Task OnConnect(IContext context)
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port);
}
}
Expand Down Expand Up @@ -78,6 +79,7 @@ private async Task OnClusterTopology(IContext context, ClusterTopology clusterTo
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "Failed to connect to seed node {Member}", member.Address);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Seed/SeedNodeActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private async Task OnConnect(IContext context)
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogError(x, "Failed to connect to seed node {Host}:{Port}", host, port);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Remote/Endpoints/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private int DropMessagesInBatch(RemoteMessage remoteMessage)
}
catch (Exception ex)
{
ex.CheckFailFast();
if (_logger.IsEnabled(_deserializationErrorLogLevel))
_logger.Log(
_deserializationErrorLogLevel,
Expand Down Expand Up @@ -312,6 +313,7 @@ private async Task RunAsync()
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(ex, "[{SystemAddress}] Error in RunAsync", System.Address);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ await responseStream.WriteAsync(new RemoteMessage
}
).ConfigureAwait(false);
}
catch (Exception)
catch (Exception x)
{
x.CheckFailFast();
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address);
}
}
Expand Down Expand Up @@ -148,6 +149,7 @@ await responseStream.WriteAsync(new RemoteMessage
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogWarning(e, "[EndpointReader][{SystemAddress}] Writing error to {SystemId}", _system.Address, systemId);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Remote/Endpoints/RemoteMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void HandleRemoteMessage(RemoteMessage currentMessage)
}
catch (Exception ex)
{
ex.CheckFailFast();
if (_logger.IsEnabled(_deserializationErrorLogLevel))
_logger.Log(
_deserializationErrorLogLevel,
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Remote/Endpoints/ServerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ await call.RequestStream.WriteAsync(new RemoteMessage
}
catch (Exception e)
{
e.CheckFailFast();
if (actorSystemId is not null && _system.Remote().BlockList.IsBlocked(actorSystemId))
{
_logger.LogDebug("[ServerConnector][{SystemAddress}] dropped connection to blocked member {ActorSystemId}/{Address}", _system.Address, actorSystemId, _address);
Expand Down