From dcc63738b41e6d35421025887265679945a4186f Mon Sep 17 00:00:00 2001 From: zbynek001 Date: Wed, 31 Mar 2021 18:37:37 +0200 Subject: [PATCH] Supress ActorSelectionMessage with DeadLetterSuppression (migrated from https://github.com/akka/akka/pull/28341) * for example the Cluster InitJoin message is marked with DeadLetterSuppression but was anyway logged because sent with actorSelection * for other WrappedMessage than ActorSelectionMessage we shouldn't unwrap and publish the inner in SuppressedDeadLetter because that might loose some information * therefore those are silenced in the DeadLetterListener instead Better deadLetter logging of wrapped messages (migrated from https://github.com/akka/akka/pull/28253) Logging of UnhandledMessage (migrated from https://github.com/akka/akka/pull/28414) * make use of the existing logging of dead letter also for UnhandledMessage Add Dropped to Akka.Actor (migrated partially from https://github.com/akka/akka/pull/27160) Log Dropped from DeadLetterListener --- .../ShardedDaemonProcess.cs | 2 +- .../PublishSubscribe/DistributedMessages.cs | 6 +- ...PISpec.ApproveClusterSharding.approved.txt | 2 +- ...reAPISpec.ApproveClusterTools.approved.txt | 6 +- .../CoreAPISpec.ApproveCore.approved.txt | 25 +- .../Actor/DeadLetterSupressionSpec.cs | 29 ++ .../Actor/DeadLetterSuspensionSpec.cs | 68 ++++- src/core/Akka.Tests/Event/EventStreamSpec.cs | 1 + src/core/Akka/Actor/ActorSelection.cs | 34 +-- src/core/Akka/Actor/BuiltInActors.cs | 47 +++- src/core/Akka/Actor/EmptyLocalActorRef.cs | 34 ++- src/core/Akka/Configuration/Pigeon.conf | 247 +++++++++--------- src/core/Akka/Event/DeadLetter.cs | 29 +- src/core/Akka/Event/DeadLetterListener.cs | 121 ++++++--- src/core/Akka/Event/UnhandledMessage.cs | 23 +- src/core/Akka/Routing/ConsistentHashRouter.cs | 2 +- 16 files changed, 422 insertions(+), 254 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs index 3609c6b36ba..01d9983ac94 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs @@ -78,7 +78,7 @@ public MessageExtractor(int maxNumberOfShards) /// and have the message types themselves carry identifiers. /// /// - public sealed class ShardingEnvelope + public sealed class ShardingEnvelope: IWrappedMessage { public string EntityId { get; } public object Message { get; } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs index 988eec784da..4d4ec65efbc 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedMessages.cs @@ -353,7 +353,7 @@ public override string ToString() /// TBD /// [Serializable] - public sealed class Publish : IDistributedPubSubMessage, IEquatable + public sealed class Publish : IDistributedPubSubMessage, IEquatable, IWrappedMessage { /// /// TBD @@ -420,7 +420,7 @@ public override string ToString() /// TBD /// [Serializable] - public sealed class Send : IDistributedPubSubMessage, IEquatable + public sealed class Send : IDistributedPubSubMessage, IEquatable, IWrappedMessage { /// /// TBD @@ -487,7 +487,7 @@ public override string ToString() /// TBD /// [Serializable] - public sealed class SendToAll : IDistributedPubSubMessage, IEquatable + public sealed class SendToAll : IDistributedPubSubMessage, IEquatable, IWrappedMessage { /// /// TBD diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt index dc5aac9abde..03714456bf2 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt @@ -253,7 +253,7 @@ namespace Akka.Cluster.Sharding public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithRole(string role) { } public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithShardingSettings(Akka.Cluster.Sharding.ClusterShardingSettings shardingSettings) { } } - public sealed class ShardingEnvelope + public sealed class ShardingEnvelope : Akka.Actor.IWrappedMessage { public ShardingEnvelope(string entityId, object message) { } public string EntityId { get; } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt index 60b0840e284..df676d7749b 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt @@ -229,7 +229,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe public static Akka.Cluster.Tools.PublishSubscribe.GetTopics Instance { get; } } public interface IDistributedPubSubMessage { } - public sealed class Publish : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable + public sealed class Publish : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable { public Publish(string topic, object message, bool sendOneMessageToEachGroup = False) { } public object Message { get; } @@ -258,7 +258,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe public override int GetHashCode() { } public override string ToString() { } } - public sealed class Send : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable + public sealed class Send : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable { public Send(string path, object message, bool localAffinity = False) { } public bool LocalAffinity { get; } @@ -269,7 +269,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe public override int GetHashCode() { } public override string ToString() { } } - public sealed class SendToAll : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable + public sealed class SendToAll : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable { public SendToAll(string path, object message, bool excludeSelf = False) { } public bool ExcludeSelf { get; } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 7d9a54420db..f0e808d26ce 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -320,7 +320,7 @@ namespace Akka.Actor public void Tell(object message, Akka.Actor.IActorRef sender = null) { } public override string ToString() { } } - public class ActorSelectionMessage : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful + public class ActorSelectionMessage : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Actor.IWrappedMessage { public ActorSelectionMessage(object message, Akka.Actor.SelectionPathElement[] elements, bool wildCardFanOut = False) { } public Akka.Actor.SelectionPathElement[] Elements { get; } @@ -1145,6 +1145,10 @@ namespace Akka.Actor Akka.Actor.ITimerScheduler Timers { get; set; } } public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue { } + public interface IWrappedMessage + { + object Message { get; } + } public sealed class Identify : Akka.Actor.IAutoReceivedMessage, Akka.Actor.INotInfluenceReceiveTimeout { public Identify(object messageId) { } @@ -1797,6 +1801,10 @@ namespace Akka.Actor protected void RunTask(System.Func action) { } } public delegate void UntypedReceive(object message); + public class static WrappedMessage + { + public static object Unwrap(object message) { } + } } namespace Akka.Actor.Dsl { @@ -2879,7 +2887,7 @@ namespace Akka.Event { protected ActorEventBus() { } } - public abstract class AllDeadLetters + public abstract class AllDeadLetters : Akka.Actor.IWrappedMessage { protected AllDeadLetters(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { } public object Message { get; } @@ -2933,6 +2941,12 @@ namespace Akka.Event protected virtual void Print(Akka.Event.LogEvent logEvent) { } protected override bool Receive(object message) { } } + public sealed class Dropped : Akka.Event.AllDeadLetters + { + public Dropped(object message, string reason, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { } + public Dropped(object message, string reason, Akka.Actor.IActorRef recipient) { } + public string Reason { get; } + } public class DummyClassForStringSources { public DummyClassForStringSources() { } @@ -3155,12 +3169,9 @@ namespace Akka.Event public TraceLogger() { } protected override void OnReceive(object message) { } } - public sealed class UnhandledMessage + public sealed class UnhandledMessage : Akka.Event.AllDeadLetters, Akka.Actor.IWrappedMessage { public UnhandledMessage(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { } - public object Message { get; } - public Akka.Actor.IActorRef Recipient { get; } - public Akka.Actor.IActorRef Sender { get; } } public class Warning : Akka.Event.LogEvent { @@ -4088,7 +4099,7 @@ namespace Akka.Routing public Akka.Util.ISurrogated FromSurrogate(Akka.Actor.ActorSystem system) { } } } - public sealed class ConsistentHashableEnvelope : Akka.Routing.RouterEnvelope, Akka.Routing.IConsistentHashable + public sealed class ConsistentHashableEnvelope : Akka.Routing.RouterEnvelope, Akka.Actor.IWrappedMessage, Akka.Routing.IConsistentHashable { public ConsistentHashableEnvelope(object message, object hashKey) { } public object ConsistentHashKey { get; } diff --git a/src/core/Akka.Tests/Actor/DeadLetterSupressionSpec.cs b/src/core/Akka.Tests/Actor/DeadLetterSupressionSpec.cs index 8e0a3604dee..ed1ef87e6ec 100644 --- a/src/core/Akka.Tests/Actor/DeadLetterSupressionSpec.cs +++ b/src/core/Akka.Tests/Actor/DeadLetterSupressionSpec.cs @@ -82,6 +82,19 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead allDeadLetter.Recipient.Should().Be(deadActor); allListener.ExpectNoMsg(200.Milliseconds()); + + // unwrap for ActorSelection + Sys.ActorSelection(deadActor.Path).Tell(new SuppressedMessage()); + Sys.ActorSelection(deadActor.Path).Tell(new NormalMessage()); + + // the recipient ref isn't the same as deadActor here so only checking the message + deadLetter = deadListener.ExpectMsg();// + deadLetter.Message.Should().BeOfType(); + suppressedDeadLetter = suppressedListener.ExpectMsg(); + suppressedDeadLetter.Message.Should().BeOfType(); + + deadListener.ExpectNoMsg(200.Milliseconds()); + suppressedListener.ExpectNoMsg(200.Milliseconds()); } [Fact] @@ -123,6 +136,22 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead deadListener.ExpectNoMsg(TimeSpan.Zero); suppressedListener.ExpectNoMsg(TimeSpan.Zero); allListener.ExpectNoMsg(TimeSpan.Zero); + + // unwrap for ActorSelection + Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new SuppressedMessage()); + Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new NormalMessage()); + + deadLetter = deadListener.ExpectMsg(); + deadLetter.Message.Should().BeOfType(); + deadLetter.Sender.Should().Be(TestActor); + deadLetter.Recipient.Should().Be(Sys.DeadLetters); + suppressedDeadLetter = suppressedListener.ExpectMsg(); + suppressedDeadLetter.Message.Should().BeOfType(); + suppressedDeadLetter.Sender.Should().Be(TestActor); + suppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters); + + deadListener.ExpectNoMsg(200.Milliseconds()); + suppressedListener.ExpectNoMsg(200.Milliseconds()); } } } diff --git a/src/core/Akka.Tests/Actor/DeadLetterSuspensionSpec.cs b/src/core/Akka.Tests/Actor/DeadLetterSuspensionSpec.cs index 35af70cf1df..96b7aff19e9 100644 --- a/src/core/Akka.Tests/Actor/DeadLetterSuspensionSpec.cs +++ b/src/core/Akka.Tests/Actor/DeadLetterSuspensionSpec.cs @@ -8,6 +8,7 @@ using System.Threading; using Akka.Actor; using Akka.Configuration; +using Akka.Event; using Akka.TestKit; using Xunit; @@ -15,12 +16,46 @@ namespace Akka.Tests.Actor { public class DeadLetterSuspensionSpec : AkkaSpec { + private class Dropping : ActorBase + { + public static Props Props() => Akka.Actor.Props.Create(() => new Dropping()); + + protected override bool Receive(object message) + { + switch (message) + { + case int n: + Context.System.EventStream.Publish(new Dropped(n, "Don't like numbers", Self)); + return true; + } + return false; + } + } + + private class Unandled : ActorBase + { + public static Props Props() => Akka.Actor.Props.Create(() => new Unandled()); + + protected override bool Receive(object message) + { + switch (message) + { + case int n: + Unhandled(n); + return true; + } + return false; + } + } + private static readonly Config Config = ConfigurationFactory.ParseString(@" akka.loglevel = INFO - akka.log-dead-letters = 3 + akka.log-dead-letters = 4 akka.log-dead-letters-suspend-duration = 2s"); private readonly IActorRef _deadActor; + private readonly IActorRef _droppingActor; + private readonly IActorRef _unhandledActor; public DeadLetterSuspensionSpec() : base(Config) @@ -29,10 +64,20 @@ public DeadLetterSuspensionSpec() Watch(_deadActor); _deadActor.Tell(PoisonPill.Instance); ExpectTerminated(_deadActor); + + _droppingActor = Sys.ActorOf(Dropping.Props(), "droppingActor"); + _unhandledActor = Sys.ActorOf(Unandled.Props(), "unhandledActor"); } private string ExpectedDeadLettersLogMessage(int count) => - $"Message [{count.GetType().Name}] from {TestActor.Path} to {_deadActor.Path} was not delivered. [{count}] dead letters encountered"; + $"Message [{count.GetType().Name}] from {TestActor} to {_deadActor} was not delivered. [{count}] dead letters encountered"; + + private string ExpectedDroppedLogMessage(int count) => + $"Message [{count.GetType().Name}] to {_droppingActor} was dropped. Don't like numbers. [{count}] dead letters encountered"; + + private string ExpectedUnhandledLogMessage(int count) => + $"Message [{count.GetType().Name}] from {TestActor} to {_unhandledActor} was unhandled. [{count}] dead letters encountered"; + [Fact] public void Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letters_and_then_re_enable() @@ -42,28 +87,31 @@ public void Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letter .Expect(1, () => _deadActor.Tell(1)); EventFilter - .Info(start: ExpectedDeadLettersLogMessage(2)) - .Expect(1, () => _deadActor.Tell(2)); + .Info(start: ExpectedDroppedLogMessage(2)) + .Expect(1, () => _droppingActor.Tell(2)); EventFilter - .Info(start: ExpectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next") - .Expect(1, () => _deadActor.Tell(3)); + .Info(start: ExpectedUnhandledLogMessage(3)) + .Expect(1, () => _unhandledActor.Tell(3)); - _deadActor.Tell(4); + EventFilter + .Info(start: ExpectedDeadLettersLogMessage(4) + ", no more dead letters will be logged in next") + .Expect(1, () => _deadActor.Tell(4)); _deadActor.Tell(5); + _droppingActor.Tell(6); // let suspend-duration elapse Thread.Sleep(2050); // re-enabled EventFilter - .Info(start: ExpectedDeadLettersLogMessage(6) + ", of which 2 were not logged") - .Expect(1, () => _deadActor.Tell(6)); + .Info(start: ExpectedDeadLettersLogMessage(7) + ", of which 2 were not logged") + .Expect(1, () => _deadActor.Tell(7)); // reset count EventFilter .Info(start: ExpectedDeadLettersLogMessage(1)) - .Expect(1, () => _deadActor.Tell(7)); + .Expect(1, () => _deadActor.Tell(8)); } } } diff --git a/src/core/Akka.Tests/Event/EventStreamSpec.cs b/src/core/Akka.Tests/Event/EventStreamSpec.cs index c7f9cc45d8f..9347b5ac306 100644 --- a/src/core/Akka.Tests/Event/EventStreamSpec.cs +++ b/src/core/Akka.Tests/Event/EventStreamSpec.cs @@ -296,6 +296,7 @@ private static string GetDebugUnhandledMessagesConfig() akka { actor.serialize-messages = off actor.debug.unhandled = on + log-dead-letters = off stdout-loglevel = DEBUG loglevel = DEBUG loggers = [""%logger%""] diff --git a/src/core/Akka/Actor/ActorSelection.cs b/src/core/Akka/Actor/ActorSelection.cs index d9bcea3c6a7..ec5e21f53f6 100644 --- a/src/core/Akka/Actor/ActorSelection.cs +++ b/src/core/Akka/Actor/ActorSelection.cs @@ -63,10 +63,10 @@ public ActorSelection(IActorRef anchor, SelectionPathElement[] path) /// The anchor. /// The path. public ActorSelection(IActorRef anchor, string path) - : this(anchor, path == "" ? new string[] {} : path.Split('/')) + : this(anchor, path == "" ? new string[] { } : path.Split('/')) { } - + /// /// Initializes a new instance of the class. /// @@ -78,16 +78,16 @@ public ActorSelection(IActorRef anchor, IEnumerable elements) var list = new List(); var iter = elements.Iterator(); - while(!iter.IsEmpty()) + while (!iter.IsEmpty()) { var s = iter.Next(); - switch(s) + switch (s) { case null: case "": break; case "**": - if(!iter.IsEmpty()) + if (!iter.IsEmpty()) throw new IllegalActorNameException("Double wildcard can only appear at the last path entry"); list.Add(new SelectChildRecursive()); break; @@ -115,7 +115,7 @@ public void Tell(object message, IActorRef sender = null) if (sender == null && ActorCell.Current != null && ActorCell.Current.Self != null) sender = ActorCell.Current.Self; - DeliverSelection(Anchor as IInternalActorRef, sender, + DeliverSelection(Anchor as IInternalActorRef, sender, new ActorSelectionMessage(message, Path, wildCardFanOut: false)); } @@ -124,7 +124,7 @@ public void Tell(object message, IActorRef sender = null) /// The result is returned as a Task that is completed with the /// if such an actor exists. It is completed with failure if /// no such actor exists or the identification didn't complete within the supplied . - /// + /// /// Under the hood it talks to the actor to verify its existence and acquire its /// /// @@ -141,7 +141,7 @@ public void Tell(object message, IActorRef sender = null) /// The result is returned as a Task that is completed with the /// if such an actor exists. It is completed with failure if /// no such actor exists or the identification didn't complete within the supplied . - /// + /// /// Under the hood it talks to the actor to verify its existence and acquire its /// /// @@ -161,17 +161,17 @@ private async Task InnerResolveOne(TimeSpan timeout, CancellationToke try { var identity = await this.Ask(new Identify(null), timeout, ct).ConfigureAwait(false); - if(identity.Subject == null) + if (identity.Subject == null) throw new ActorNotFoundException("subject was null"); return identity.Subject; } - catch(Exception ex) + catch (Exception ex) { throw new ActorNotFoundException("Exception occurred while resolving ActorSelection", ex); } } - + /// /// INTERNAL API /// Convenience method used by remoting when receiving from a remote @@ -199,7 +199,7 @@ void Rec(IInternalActorRef actorRef) path: anchor.Path / sel.Elements.Select(el => el.ToString()), eventStream: refWithCell.Underlying.System.EventStream); - switch(iter.Next()) + switch (iter.Next()) { case SelectParent _: var parent = actorRef.Parent; @@ -259,7 +259,7 @@ void Rec(IInternalActorRef actorRef) } else { - // don't send to emptyRef after wildcard fan-out + // don't send to emptyRef after wildcard fan-out if (matchingChildren.Count == 0 && !sel.WildCardFanOut) emptyRef.Tell(sel, sender); else @@ -269,7 +269,7 @@ void Rec(IInternalActorRef actorRef) elements: iter.ToVector().ToArray(), wildCardFanOut: sel.WildCardFanOut || matchingChildren.Count > 1); - for(var i = 0; i < matchingChildren.Count; i++) + for (var i = 0; i < matchingChildren.Count; i++) DeliverSelection(matchingChildren[i] as IInternalActorRef, sender, message); } } @@ -326,7 +326,7 @@ public override string ToString() /// /// Used to deliver messages via . /// - public class ActorSelectionMessage : IAutoReceivedMessage, IPossiblyHarmful + public class ActorSelectionMessage : IAutoReceivedMessage, IPossiblyHarmful, IWrappedMessage { /// /// Initializes a new instance of the class. @@ -380,7 +380,7 @@ public ActorSelectionMessage Copy(object message = null, SelectionPathElement[] /// /// Class SelectionPathElement. /// - public abstract class SelectionPathElement + public abstract class SelectionPathElement { } @@ -470,7 +470,7 @@ public override bool Equals(object obj) { if (obj is null) return false; if (ReferenceEquals(this, obj)) return true; - if(!(obj is SelectChildRecursive)) return false; + if (!(obj is SelectChildRecursive)) return false; return true; } diff --git a/src/core/Akka/Actor/BuiltInActors.cs b/src/core/Akka/Actor/BuiltInActors.cs index ff3e11f7a0b..a2aa954b9c4 100644 --- a/src/core/Akka/Actor/BuiltInActors.cs +++ b/src/core/Akka/Actor/BuiltInActors.cs @@ -41,9 +41,9 @@ public class GuardianActor : ActorBase, IRequiresMessageQueueTBD protected override bool Receive(object message) { - if(message is Terminated) + if (message is Terminated) Context.Stop(Self); - else if(message is StopChild) + else if (message is StopChild) Context.Stop(((StopChild)message).Child); else Context.System.DeadLetters.Tell(new DeadLetter(message, Sender, Self), Sender); @@ -60,8 +60,8 @@ protected override void PreStart() } /// - /// System guardian. - /// + /// System guardian. + /// /// Root actor for all actors under the /system path. /// public class SystemGuardianActor : ActorBase, IRequiresMessageQueue @@ -87,16 +87,16 @@ public SystemGuardianActor(IActorRef userGuardian) protected override bool Receive(object message) { var terminated = message as Terminated; - if(terminated != null) + if (terminated != null) { var terminatedActor = terminated.ActorRef; - if(_userGuardian.Equals(terminatedActor)) + if (_userGuardian.Equals(terminatedActor)) { // time for the systemGuardian to stop, but first notify all the // termination hooks, they will reply with TerminationHookDone // and when all are done the systemGuardian is stopped Context.Become(Terminating); - foreach(var terminationHook in _terminationHooks) + foreach (var terminationHook in _terminationHooks) { terminationHook.Tell(TerminationHook.Instance); } @@ -110,17 +110,17 @@ protected override bool Receive(object message) } return true; } - + var stopChild = message as StopChild; - if(stopChild != null) + if (stopChild != null) { Context.Stop(stopChild.Child); return true; } var sender = Sender; - + var registerTerminationHook = message as RegisterTerminationHook; - if(registerTerminationHook != null && !ReferenceEquals(sender, Context.System.DeadLetters)) + if (registerTerminationHook != null && !ReferenceEquals(sender, Context.System.DeadLetters)) { _terminationHooks.Add(sender); Context.Watch(sender); @@ -133,7 +133,7 @@ protected override bool Receive(object message) private bool Terminating(object message) { var terminated = message as Terminated; - if(terminated != null) + if (terminated != null) { StopWhenAllTerminationHooksDone(terminated.ActorRef); return true; @@ -141,7 +141,7 @@ private bool Terminating(object message) var sender = Sender; var terminationHookDone = message as TerminationHookDone; - if(terminationHookDone != null) + if (terminationHookDone != null) { StopWhenAllTerminationHooksDone(sender); return true; @@ -158,7 +158,7 @@ private void StopWhenAllTerminationHooksDone(IActorRef terminatedActor) private void StopWhenAllTerminationHooksDone() { - if(_terminationHooks.Count == 0) + if (_terminationHooks.Count == 0) { var actorSystem = Context.System; actorSystem.EventStream.StopDefaultLoggers(actorSystem); @@ -178,6 +178,25 @@ protected override void PreRestart(Exception reason, object message) } } + /// + /// Message envelopes may implement this trait for better logging, such as logging of + /// message class name of the wrapped message instead of the envelope class name. + /// + public interface IWrappedMessage + { + object Message { get; } + } + + public static class WrappedMessage + { + public static object Unwrap(object message) + { + if (message is IWrappedMessage wm) + return Unwrap(wm.Message); + return message; + } + } + /// /// Class DeadLetterActorRef. /// diff --git a/src/core/Akka/Actor/EmptyLocalActorRef.cs b/src/core/Akka/Actor/EmptyLocalActorRef.cs index 52c1194ac65..2c37220d98a 100644 --- a/src/core/Akka/Actor/EmptyLocalActorRef.cs +++ b/src/core/Akka/Actor/EmptyLocalActorRef.cs @@ -59,8 +59,7 @@ public EmptyLocalActorRef(IActorRefProvider provider, ActorPath path, EventStrea protected override void TellInternal(object message, IActorRef sender) { if (message == null) throw new InvalidMessageException("Message is null"); - var d = message as DeadLetter; - if (d != null) SpecialHandle(d.Message, d.Sender); + if (message is DeadLetter d) SpecialHandle(d.Message, d.Sender); else if (!SpecialHandle(message, sender)) { _eventStream.Publish(new DeadLetter(message, sender.IsNobody() ? _provider.DeadLetters : sender, this)); @@ -85,8 +84,7 @@ public override void SendSystemMessage(ISystemMessage message) /// TBD protected virtual bool SpecialHandle(object message, IActorRef sender) { - var watch = message as Watch; - if (watch != null) + if (message is Watch watch) { if (watch.Watchee.Equals(this) && !watch.Watcher.Equals(this)) { @@ -97,37 +95,45 @@ protected virtual bool SpecialHandle(object message, IActorRef sender) if (message is Unwatch) return true; //Just ignore - var identify = message as Identify; - if (identify != null) + if (message is Identify identify) { sender.Tell(new ActorIdentity(identify.MessageId, null)); return true; } - var actorSelectionMessage = message as ActorSelectionMessage; - if (actorSelectionMessage != null) + if (message is ActorSelectionMessage actorSelectionMessage) { - var selectionIdentify = actorSelectionMessage.Message as Identify; - if (selectionIdentify != null) + if (actorSelectionMessage.Message is Identify selectionIdentify) { if (!actorSelectionMessage.WildCardFanOut) sender.Tell(new ActorIdentity(selectionIdentify.MessageId, null)); } else { - _eventStream.Publish(new DeadLetter(actorSelectionMessage.Message, sender.IsNobody() ? _provider.DeadLetters : sender, this)); + if (actorSelectionMessage.Message is IDeadLetterSuppression selectionDeadLetterSuppression) + { + PublishSupressedDeadLetter(selectionDeadLetterSuppression, sender); + } + else + { + _eventStream.Publish(new DeadLetter(actorSelectionMessage.Message, sender.IsNobody() ? _provider.DeadLetters : sender, this)); + } } return true; } - var deadLetterSuppression = message as IDeadLetterSuppression; - if (deadLetterSuppression != null) + if (message is IDeadLetterSuppression deadLetterSuppression) { - _eventStream.Publish(new SuppressedDeadLetter(deadLetterSuppression, sender.IsNobody() ? _provider.DeadLetters : sender, this)); + PublishSupressedDeadLetter(deadLetterSuppression, sender); return true; } return false; } + + private void PublishSupressedDeadLetter(IDeadLetterSuppression msg, IActorRef sender) + { + _eventStream.Publish(new SuppressedDeadLetter(msg, sender.IsNobody() ? _provider.DeadLetters : sender, this)); + } } } diff --git a/src/core/Akka/Configuration/Pigeon.conf b/src/core/Akka/Configuration/Pigeon.conf index 4b06d433871..a2d17b96a50 100644 --- a/src/core/Akka/Configuration/Pigeon.conf +++ b/src/core/Akka/Configuration/Pigeon.conf @@ -2,34 +2,34 @@ #################################### # Akka Actor Reference Config File # #################################### - + # This is the reference config file that contains all the default settings. # Make your edits/overrides in your application.conf. - + akka { # Akka version, checked against the runtime version of Akka. version = "0.0.1 Akka" - + # Home directory of Akka, modules in the deploy directory will be loaded home = "" - + # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs # to STDOUT) loggers = ["Akka.Event.DefaultLogger"] # Specifies the default loggers dispatcher loggers-dispatcher = "akka.actor.default-dispatcher" - + # Loggers are created and registered synchronously during ActorSystem # start-up, and since they are actors, this timeout is used to bound the # waiting time logger-startup-timeout = 5s - + # You can enable asynchronous loggers creation by setting this to `true`. # This may be useful in cases when ActorSystem creation takes more time # then it should, or for whatever other reason logger-async-start = false - + # Log level used by the configured loggers (see "loggers") as soon # as they have been started; before that, see "stdout-loglevel" # Options: OFF, ERROR, WARNING, INFO, DEBUG @@ -38,22 +38,23 @@ akka { # Suppresses warning about usage of the default (JSON.NET) serializer # which is going to be obsoleted at v1.5 suppress-json-serializer-warning = on #disabled as of 1.3.2, given that the 1.5 timeframe is far out - + # Log level for the very basic logger activated during AkkaApplication startup # Options: OFF, ERROR, WARNING, INFO, DEBUG stdout-loglevel = "WARNING" - + # Log the complete configuration at INFO level when the actor system is started. # This is useful when you are uncertain of what configuration is used. log-config-on-start = off - - # Log at info level when messages are sent to dead letters. + + # Log at info level when messages are sent to dead letters, or published to + # eventStream as `DeadLetter`, `Dropped` or `UnhandledMessage`. # Possible values: # on: all dead letters are logged # off: no logging of dead letters # n: positive integer, number of dead letters that will be logged log-dead-letters = 10 - + # Possibility to turn off logging of dead letters while the actor system # is shutting down. Logging is only done when enabled by 'log-dead-letters' # setting. @@ -63,48 +64,48 @@ akka { # infinite: suspend the logging forever; # or a duration (eg: 5 minutes), after which the logging will be re-enabled. log-dead-letters-suspend-duration = 5 minutes - + # List FQCN of extensions which shall be loaded at actor system startup. # Should be on the format: 'extensions = ["foo", "bar"]' etc. # See the Akka Documentation for more info about Extensions extensions = [] - + # Toggles whether threads created by this ActorSystem should be daemons or not daemonic = off - + # THIS DOES NOT APPLY TO .NET # # JVM shutdown, System.exit(-1), in case of a fatal error, # such as OutOfMemoryError #jvm-exit-on-fatal-error = on - + actor { - + # FQCN of the ActorRefProvider to be used; the below is the built-in default, # another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle. provider = "Akka.Actor.LocalActorRefProvider" - + # The guardian "/user" will use this class to obtain its supervisorStrategy. # It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. # In addition to the default there is Akka.Actor.StoppingSupervisorStrategy. guardian-supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy" - + # Timeout for ActorSystem.actorOf creation-timeout = 20s - + # Frequency with which stopping actors are prodded in case they had to be # removed from their parents reaper-interval = 5 - + # Serializes and deserializes (non-primitive) messages to ensure immutability, # this is only intended for testing. serialize-messages = off - + # Serializes and deserializes creators (in Props) to ensure that they can be # sent over the network, this is only intended for testing. Purely local deployments # as marked with deploy.scope == LocalScope are exempt from verification. serialize-creators = off - + # Timeout for send operations to top-level actors which are in the process # of being started. This is only relevant if using a bounded mailbox or the # CallingThreadDispatcher for a top-level actor. @@ -112,7 +113,7 @@ akka { # Default timeout for IActorRef.Ask. ask-timeout = infinite - + # THIS DOES NOT APPLY TO .NET # @@ -125,7 +126,7 @@ akka { inbox-size = 1000, default-timeout = 5s } - + # Mapping between ´deployment.router' short names to fully qualified class names router.type-mapping { from-code = "Akka.Routing.NoRouter" @@ -148,18 +149,18 @@ akka { cluster-metrics-adaptive-pool = "Akka.Cluster.Metrics.AdaptiveLoadBalancingPool, Akka.Cluster.Metrics" cluster-metrics-adaptive-group = "Akka.Cluster.Metrics.AdaptiveLoadBalancingGroup, Akka.Cluster.Metrics" } - + deployment { - + # deployment id pattern - on the format: /parent/child etc. default { - + # The id of the dispatcher to use for this actor. # If undefined or empty the dispatcher specified in code # (Props.withDispatcher) is used, or default-dispatcher if not # specified at all. dispatcher = "" - + # The id of the mailbox to use for this actor. # If undefined or empty the default mailbox of the configured dispatcher # is used or if there is no mailbox configuration the mailbox specified @@ -167,7 +168,7 @@ akka { # If there is a mailbox defined in the configured dispatcher then that # overrides this setting. mailbox = "" - + # routing (load-balance) scheme to use # - available: "from-code", "round-robin", "random", "smallest-mailbox", # "scatter-gather", "broadcast" @@ -188,46 +189,46 @@ akka { # - resizer: dynamically resizable number of routees as specified in # resizer below router = "from-code" - + # number of children to create in case of a router; # this setting is ignored if routees.paths is given nr-of-instances = 1 - + # within is the timeout used for routers containing future calls within = 5 s - + # number of virtual nodes per node for consistent-hashing router virtual-nodes-factor = 10 - + routees { # Alternatively to giving nr-of-instances you can specify the full # paths of those actors which should be routed to. This setting takes # precedence over nr-of-instances paths = [] } - + # To use a dedicated dispatcher for the routees of the pool you can - # define the dispatcher configuration inline with the property name + # define the dispatcher configuration inline with the property name # 'pool-dispatcher' in the deployment section of the router. # For example: # pool-dispatcher { # fork-join-executor.parallelism-min = 5 # fork-join-executor.parallelism-max = 5 # } - + # Routers with dynamically resizable number of routees; this feature is # enabled by including (parts of) this section in the deployment resizer { - + enabled = off - + # The fewest number of routees the router should ever have. lower-bound = 1 - + # The most number of routees the router should ever have. # Must be greater than or equal to lower-bound. upper-bound = 10 - + # Threshold used to evaluate if a routee is considered to be busy # (under pressure). Implementation depends on this value (default is 1). # 0: number of routees currently processing a message. @@ -237,12 +238,12 @@ akka { # messages in their mailbox. Note that estimating mailbox size of # default UnboundedMailbox is O(N) operation. pressure-threshold = 1 - + # Percentage to increase capacity whenever all routees are busy. # For example, 0.2 would increase 20% (rounded up), i.e. if current # capacity is 6 it will request an increase of 2 more routees. rampup-rate = 0.2 - + # Minimum fraction of busy routees before backing off. # For example, if this is 0.3, then we'll remove some routees only when # less than 30% of routees are busy, i.e. if current capacity is 10 and @@ -250,13 +251,13 @@ akka { # the capacity is decreased. # Use 0.0 or negative to avoid removal of routees. backoff-threshold = 0.3 - + # Fraction of routees to be removed when the resizer reaches the # backoffThreshold. # For example, 0.1 would decrease 10% (rounded up), i.e. if current # capacity is 9 it will request an decrease of 1 routee. backoff-rate = 0.1 - + # Number of messages between resize operation. # Use 1 to resize before each message. messages-per-resize = 10 @@ -287,7 +288,7 @@ akka { threadtype = background #values can be "background" or "foreground" } } - + default-dispatcher { # Must be one of the following # Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting @@ -337,22 +338,22 @@ akka { # For running in current synchronization contexts current-context-executor{} - + # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 1s - + # Throughput defines the number of messages that are processed in a batch # before the thread is returned to the pool. Set to 1 for as fair as possible. throughput = 30 - + # Throughput deadline for Dispatcher, set to 0 or negative for no deadline throughput-deadline-time = 0ms - + # For BalancingDispatcher: If the balancing dispatcher should attempt to # schedule idle actors using the same dispatcher when a message comes in, # and the dispatchers ExecutorService is not fully busy already. attempt-teamwork = on - + # If this dispatcher requires a specific type of mailbox, specify the # fully-qualified class name here; the actually created mailbox will # be a subtype of this type. The empty string signifies no requirement. @@ -373,7 +374,7 @@ akka { parallelism-max = 64 } } - + default-blocking-io-dispatcher { type = "Dispatcher" executor = "thread-pool-executor" @@ -392,26 +393,26 @@ akka { # constructor with # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. mailbox-type = "Akka.Dispatch.UnboundedMailbox" - + # If the mailbox is bounded then it uses this setting to determine its # capacity. The provided value must be positive. # NOTICE: # Up to version 2.1 the mailbox type was determined based on this setting; # this is no longer the case, the type must explicitly be a bounded mailbox. mailbox-capacity = 1000 - + # If the mailbox is bounded then this is the timeout for enqueueing # in case the mailbox is full. Negative values signify infinite # timeout, which should be avoided as it bears the risk of dead-lock. mailbox-push-timeout-time = 10s - + # For Actor with Stash: The default capacity of the stash. # If negative (or zero) then an unbounded stash is used (default) # If positive then a bounded stash is used and the capacity is set using # the property stash-capacity = -1 } - + mailbox { # Mapping between message queue semantics and mailbox configurations. # Used by akka.dispatch.RequiresMessageQueue[T] to enforce different @@ -428,28 +429,28 @@ akka { "Akka.Dispatch.IMultipleConsumerSemantics" = akka.actor.mailbox.unbounded-queue-based "Akka.Event.ILoggerMessageQueueSemantics" = akka.actor.mailbox.logger-queue } - + unbounded-queue-based { # FQCN of the MailboxType, The Class of the FQCN must have a public # constructor with (akka.actor.ActorSystem.Settings, # com.typesafe.config.Config) parameters. mailbox-type = "Akka.Dispatch.UnboundedMailbox" } - + bounded-queue-based { # FQCN of the MailboxType, The Class of the FQCN must have a public # constructor with (akka.actor.ActorSystem.Settings, # com.typesafe.config.Config) parameters. mailbox-type = "Akka.Dispatch.BoundedMailbox" } - + unbounded-deque-based { # FQCN of the MailboxType, The Class of the FQCN must have a public # constructor with (akka.actor.ActorSystem.Settings, # com.typesafe.config.Config) parameters. mailbox-type = "Akka.Dispatch.UnboundedDequeBasedMailbox" } - + bounded-deque-based { # FQCN of the MailboxType, The Class of the FQCN must have a public # constructor with (akka.actor.ActorSystem.Settings, @@ -464,39 +465,39 @@ akka { mailbox-type = "Akka.Event.LoggerMailboxType" } } - + debug { # enable function of Actor.loggable(), which is to log any received message # at DEBUG level, see the “Testing Actor Systems” section of the Akka # Documentation at http://akka.io/docs receive = off - + # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill et.c.) autoreceive = off - + # enable DEBUG logging of actor lifecycle changes lifecycle = off - + # enable DEBUG logging of all LoggingFSMs for events, transitions and timers fsm = off - + # enable DEBUG logging of subscription changes on the eventStream event-stream = off - + # enable DEBUG logging of unhandled messages unhandled = off - + # enable WARN logging of misconfigured routers router-misconfiguration = off } - + # Entries for pluggable serializers and their bindings. serializers { json = "Akka.Serialization.NewtonSoftJsonSerializer, Akka" bytes = "Akka.Serialization.ByteArraySerializer, Akka" } - + # Class to Serializer binding. You only need to specify the name of an # interface or abstract base class of the messages. In case of ambiguity it # is using the most specific configured class, or giving a warning and @@ -519,13 +520,13 @@ akka { "Akka.Serialization.ByteArraySerializer, Akka" = 4 "Akka.Serialization.NewtonSoftJsonSerializer, Akka" = 1 } - + # extra settings that can be custom to a serializer implementation serialization-settings { - + } } - + # Used to set the behavior of the scheduler. # Changing the default values may change the system behavior drastically so make # sure you know what you're doing! See the Scheduler section of the Akka @@ -541,14 +542,14 @@ akka { # tick-duration to a high value will make shutting down the actor system # take longer. tick-duration = 10ms - + # The timer uses a circular wheel of buckets to store the timer tasks. # This should be set such that the majority of scheduled timeouts (for high # scheduling frequency) will be shorter than one rotation of the wheel # (ticks-per-wheel * ticks-duration) # THIS MUST BE A POWER OF TWO! ticks-per-wheel = 512 - + # This setting selects the timer implementation which shall be loaded at # system start-up. # The class given here must implement the akka.actor.Scheduler interface @@ -557,14 +558,14 @@ akka { # 2) akka.event.LoggingAdapter # 3) java.util.concurrent.ThreadFactory implementation = "Akka.Actor.HashedWheelTimerScheduler" - + # When shutting down the scheduler, there will typically be a thread which # needs to be stopped, and this timeout determines how long to wait for # that to happen. In case of timeout the shutdown of the actor system will # proceed without running possibly still enqueued tasks. shutdown-timeout = 5s - } - + } + io { # By default the select loops run on dedicated threads, hence using a @@ -577,8 +578,8 @@ akka { tcp { # Implementation of `Akka.IO.Buffers.IBufferPool` interface. It - # allocates memory is so called segments. Each segment is then cut into - # buffers of equal size (see: `buffers-per-segment`). Those buffers are + # allocates memory is so called segments. Each segment is then cut into + # buffers of equal size (see: `buffers-per-segment`). Those buffers are # then lend to the requestor. They have to be released later on. direct-buffer-pool { @@ -590,8 +591,8 @@ akka { buffer-size = 512 # Number of byte buffers per segment. Every segement is a single continuous - # byte array in memory. Once buffer pool will run out of byte buffers to - # lend it will allocate a next segment of memory. + # byte array in memory. Once buffer pool will run out of byte buffers to + # lend it will allocate a next segment of memory. # Each segments size is equal to `buffer-size` * `buffers-per-segment`. buffers-per-segment = 500 @@ -604,9 +605,9 @@ akka { buffer-pool-limit = 1024 } - # Default implementation of `Akka.IO.Buffers.IBufferPool` interface. + # Default implementation of `Akka.IO.Buffers.IBufferPool` interface. # Instead of maintaining allocated buffers and reusing them - # between different SocketAsyncEventArgs instances, it allocates new + # between different SocketAsyncEventArgs instances, it allocates new # buffer each time when new socket connection is established. disabled-buffer-pool { @@ -619,7 +620,7 @@ akka { } # A buffer pool used to acquire and release byte buffers from the managed - # heap. Once byte buffer is no longer needed is can be released, landing + # heap. Once byte buffer is no longer needed is can be released, landing # on the pool again, to be reused later. This way we can reduce a GC pressure # by reusing the same components instead of recycling them. # NOTE: pooling is disabled by default, to enable use direct-buffer-pool: @@ -643,7 +644,7 @@ akka { # higher numbers decrease latency, lower numbers increase fairness on # the worker-dispatcher batch-accept-limit = 10 - + # The duration a connection actor waits for a `Register` message from # its commander before aborting the connection. register-timeout = 5s @@ -651,7 +652,7 @@ akka { # The maximum number of bytes delivered by a `Received` message. Before # more data is read from the network the connection actor will try to # do other work. - # The purpose of this setting is to impose a smaller limit than the + # The purpose of this setting is to impose a smaller limit than the # configured receive buffer size. When using value 'unlimited' it will # try to read all from the receive buffer. max-received-message-size = unlimited @@ -707,10 +708,10 @@ akka { } udp { - + # Default implementation of `Akka.IO.Buffers.IBufferPool` interface. It - # allocates memory is so called segments. Each segment is then cut into - # buffers of equal size (see: `buffers-per-segment`). Those buffers are + # allocates memory is so called segments. Each segment is then cut into + # buffers of equal size (see: `buffers-per-segment`). Those buffers are # then lend to the requestor. They have to be released later on. direct-buffer-pool { @@ -722,8 +723,8 @@ akka { buffer-size = 512 # Number of byte buffers per segment. Every segement is a single continuous - # byte array in memory. Once buffer pool will run out of byte buffers to - # lend it will allocate a next segment of memory. + # byte array in memory. Once buffer pool will run out of byte buffers to + # lend it will allocate a next segment of memory. # Each segments size is equal to `buffer-size` * `buffers-per-segment`. buffers-per-segment = 500 @@ -737,7 +738,7 @@ akka { } # A buffer pool used to acquire and release byte buffers from the managed - # heap. Once byte buffer is no longer needed is can be released, landing + # heap. Once byte buffer is no longer needed is can be released, landing # on the pool again, to be reused later. This way we can reduce a GC pressure # by reusing the same components instead of recycling them. buffer-pool = "akka.io.udp.direct-buffer-pool" @@ -745,7 +746,7 @@ akka { # The number of selectors to stripe the served channels over; each of # these will use one select loop on the selector-dispatcher. nr-of-socket-async-event-args = 32 - + # Maximum number of open channels supported by this UDP module Generally # UDP does not require a large number of channels, therefore it is # recommended to keep this setting low. @@ -804,8 +805,8 @@ akka { udp-connected { # Default implementation of `Akka.IO.Buffers.IBufferPool` interface. It - # allocates memory is so called segments. Each segment is then cut into - # buffers of equal size (see: `buffers-per-segment`). Those buffers are + # allocates memory is so called segments. Each segment is then cut into + # buffers of equal size (see: `buffers-per-segment`). Those buffers are # then lend to the requestor. They have to be released later on. direct-buffer-pool { @@ -817,8 +818,8 @@ akka { buffer-size = 512 # Number of byte buffers per segment. Every segement is a single continuous - # byte array in memory. Once buffer pool will run out of byte buffers to - # lend it will allocate a next segment of memory. + # byte array in memory. Once buffer pool will run out of byte buffers to + # lend it will allocate a next segment of memory. # Each segments size is equal to `buffer-size` * `buffers-per-segment`. buffers-per-segment = 500 @@ -832,7 +833,7 @@ akka { } # A buffer pool used to acquire and release byte buffers from the managed - # heap. Once byte buffer is no longer needed is can be released, landing + # heap. Once byte buffer is no longer needed is can be released, landing # on the pool again, to be reused later. This way we can reduce a GC pressure # by reusing the same components instead of recycling them. buffer-pool = "akka.io.udp-connected.direct-buffer-pool" @@ -930,22 +931,22 @@ akka { # - JVM shutdown hook will by default run CoordinatedShutdown # - Cluster node will automatically run CoordinatedShutdown when it # sees itself as Exiting - # - A management console or other application specific command can + # - A management console or other application specific command can # run CoordinatedShutdown coordinated-shutdown { - # The timeout that will be used for a phase if not specified with + # The timeout that will be used for a phase if not specified with # 'timeout' in the phase default-phase-timeout = 5 s - + # Terminate the ActorSystem in the last phase actor-system-terminate. terminate-actor-system = on - + # Exit the CLR(Environment.Exit(0)) in the last phase actor-system-terminate - # if this is set to 'on'. It is done after termination of the - # ActorSystem if terminate-actor-system=on, otherwise it is done - # immediately when the last phase is reached. + # if this is set to 'on'. It is done after termination of the + # ActorSystem if terminate-actor-system=on, otherwise it is done + # immediately when the last phase is reached. exit-clr = off - + # Run the coordinated shutdown when the CLR process exits, e.g. # via kill SIGTERM signal (SIGINT ctrl-c doesn't work). run-by-clr-shutdown-hook = on @@ -954,11 +955,11 @@ akka { # Enabling this and disabling terminate-actor-system is not a supported # combination (will throw ConfigurationException at startup). run-by-actor-system-terminate = on - + #//#coordinated-shutdown-phases # CoordinatedShutdown will run the tasks that are added to these - # phases. The phases can be ordered as a DAG by defining the - # dependencies between the phases. + # phases. The phases can be ordered as a DAG by defining the + # dependencies between the phases. # Each phase is defined as a named config section with the # following optional properties: # - timeout=15s: Override the default-phase-timeout for this phase. @@ -969,68 +970,68 @@ akka { # The first pre-defined phase that applications can add tasks to. # Note that more phases can be be added in the application's - # configuration by overriding this phase with an additional + # configuration by overriding this phase with an additional # depends-on. before-service-unbind { } - + # Stop accepting new incoming requests in for example HTTP. service-unbind { depends-on = [before-service-unbind] } - + # Wait for requests that are in progress to be completed. service-requests-done { depends-on = [service-unbind] } - + # Final shutdown of service endpoints. service-stop { depends-on = [service-requests-done] } - + # Phase for custom application tasks that are to be run # after service shutdown and before cluster shutdown. before-cluster-shutdown { depends-on = [service-stop] } - + # Graceful shutdown of the Cluster Sharding regions. cluster-sharding-shutdown-region { timeout = 10 s depends-on = [before-cluster-shutdown] } - + # Emit the leave command for the node that is shutting down. cluster-leave { depends-on = [cluster-sharding-shutdown-region] } - + # Shutdown cluster singletons cluster-exiting { timeout = 10 s depends-on = [cluster-leave] } - + # Wait until exiting has been completed cluster-exiting-done { depends-on = [cluster-exiting] } - + # Shutdown the cluster extension cluster-shutdown { depends-on = [cluster-exiting-done] } - + # Phase for custom application tasks that are to be run # after cluster shutdown and before ActorSystem termination. before-actor-system-terminate { depends-on = [cluster-shutdown] } - + # Last phase. See terminate-actor-system and exit-jvm above. - # Don't add phases that depends on this phase because the - # dispatcher and scheduler of the ActorSystem have been shutdown. + # Don't add phases that depends on this phase because the + # dispatcher and scheduler of the ActorSystem have been shutdown. actor-system-terminate { timeout = 10 s depends-on = [before-actor-system-terminate] diff --git a/src/core/Akka/Event/DeadLetter.cs b/src/core/Akka/Event/DeadLetter.cs index f11e4bea15f..6ba4cf377a4 100644 --- a/src/core/Akka/Event/DeadLetter.cs +++ b/src/core/Akka/Event/DeadLetter.cs @@ -20,10 +20,13 @@ public interface IDeadLetterSuppression } /// - /// Represents a message that could not be delivered to it's recipient. + /// Represents a message that could not be delivered to it's recipient. /// This message wraps the original message, the sender and the intended recipient of the message. + /// + /// Subscribe to this class to be notified about all (also the suppressed ones) + /// and . /// - public abstract class AllDeadLetters + public abstract class AllDeadLetters : IWrappedMessage { /// /// Initializes a new instance of the class. @@ -106,4 +109,26 @@ public SuppressedDeadLetter(IDeadLetterSuppression message, IActorRef sender, IA if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null"); } } + + /// + /// Envelope that is published on the eventStream wrapped in for every message that is + /// dropped due to overfull queues or routers with no routees. + /// + /// When this message was sent without a sender , `sender` will be , i.e. `null`. + /// + public sealed class Dropped : AllDeadLetters + { + public Dropped(object message, string reason, IActorRef sender, IActorRef recipient) + : base(message, sender, recipient) + { + Reason = reason; + } + + public Dropped(object message, string reason, IActorRef recipient) + : this(message, reason, ActorRefs.NoSender, recipient) + { + } + + public string Reason { get; } + } } diff --git a/src/core/Akka/Event/DeadLetterListener.cs b/src/core/Akka/Event/DeadLetterListener.cs index 33843f87f8e..bcc83eced0e 100644 --- a/src/core/Akka/Event/DeadLetterListener.cs +++ b/src/core/Akka/Event/DeadLetterListener.cs @@ -41,6 +41,8 @@ protected override void PreRestart(Exception reason, object message) protected override void PreStart() { _eventStream.Subscribe(Self, typeof(DeadLetter)); + _eventStream.Subscribe(Self, typeof(Dropped)); + _eventStream.Subscribe(Self, typeof(UnhandledMessage)); } /// @@ -85,10 +87,13 @@ private Receive ReceiveWithAlwaysLogging() { return message => { - if (message is DeadLetter deadLetter) + if (message is AllDeadLetters d) { - IncrementCount(); - LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, ""); + if (!IsWrappedSuppressed(d)) + { + IncrementCount(); + LogDeadLetter(d, ""); + } return true; } return false; @@ -99,17 +104,20 @@ private Receive ReceiveWithMaxCountLogging() { return message => { - if (message is DeadLetter deadLetter) + if (message is AllDeadLetters d) { - IncrementCount(); - if (_count == _maxCount) - { - LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, ", no more dead letters will be logged"); - Context.Stop(Self); - } - else + if (!IsWrappedSuppressed(d)) { - LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, ""); + IncrementCount(); + if (_count == _maxCount) + { + LogDeadLetter(d, ", no more dead letters will be logged"); + Context.Stop(Self); + } + else + { + LogDeadLetter(d, ""); + } } return true; } @@ -121,18 +129,21 @@ private Receive ReceiveWithSuspendLogging(TimeSpan suspendDuration) { return message => { - if (message is DeadLetter deadLetter) + if (message is AllDeadLetters d) { - IncrementCount(); - if (_count == _maxCount) + if (!IsWrappedSuppressed(d)) { - var doneMsg = $", no more dead letters will be logged in next [{suspendDuration}]"; - LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, doneMsg); - Context.Become(ReceiveWhenSuspended(suspendDuration, Deadline.Now + suspendDuration)); - } - else - { - LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, ""); + IncrementCount(); + if (_count == _maxCount) + { + var doneMsg = $", no more dead letters will be logged in next [{suspendDuration}]"; + LogDeadLetter(d, doneMsg); + Context.Become(ReceiveWhenSuspended(suspendDuration, Deadline.Now + suspendDuration)); + } + else + { + LogDeadLetter(d, ""); + } } return true; } @@ -144,15 +155,18 @@ private Receive ReceiveWhenSuspended(TimeSpan suspendDuration, Deadline suspendD { return message => { - if (message is DeadLetter deadLetter) + if (message is AllDeadLetters d) { - IncrementCount(); - if (suspendDeadline.IsOverdue) + if (!IsWrappedSuppressed(d)) { - var doneMsg = $", of which {(_count - _maxCount - 1).ToString()} were not logged. The counter will be reset now"; - LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, doneMsg); - _count = 0; - Context.Become(ReceiveWithSuspendLogging(suspendDuration)); + IncrementCount(); + if (suspendDeadline.IsOverdue) + { + var doneMsg = $", of which {(_count - _maxCount - 1)} were not logged. The counter will be reset now"; + LogDeadLetter(d, doneMsg); + _count = 0; + Context.Become(ReceiveWithSuspendLogging(suspendDuration)); + } } return true; } @@ -160,19 +174,50 @@ private Receive ReceiveWhenSuspended(TimeSpan suspendDuration, Deadline suspendD }; } - private void LogDeadLetter(object message, IActorRef snd, IActorRef recipient, string doneMsg) + private void LogDeadLetter(AllDeadLetters d, string doneMsg) { - var messageType = ReferenceEquals(null, message) ? "null" : message.GetType().Name; - var origin = ReferenceEquals(snd, Context.System.DeadLetters) ? "without sender" : $"from {snd.Path}"; + var origin = IsReal(d.Sender) ? $" from {d.Sender}" : ""; + var unwrapped = WrappedMessage.Unwrap(d.Message); + var messageStr = unwrapped?.GetType().Name ?? "null"; + var wrappedIn = (d.Message is IWrappedMessage) ? $" wrapped in [${d.Message.GetType().Name}]" : ""; + + string logMessage; + switch (d) + { + case Dropped dropped: + var destination = IsReal(d.Recipient) ? $" to {d.Recipient}" : ""; + logMessage = $"Message [{messageStr}]{wrappedIn}{origin}{destination} was dropped. {dropped.Reason}. " + + $"[{_count}] dead letters encountered{doneMsg}. "; + break; + case UnhandledMessage unhandled: + destination = IsReal(d.Recipient) ? $" to {d.Recipient}" : ""; + logMessage = $"Message [{messageStr}]{wrappedIn}{origin}{destination} was unhandled. " + + $"[{_count}] dead letters encountered{doneMsg}. "; + break; + default: + logMessage = $"Message [{messageStr}]{wrappedIn}{origin} to {d.Recipient} was not delivered. " + + $"[{_count}] dead letters encountered{doneMsg}. " + + $"If this is not an expected behavior then {d.Recipient} may have terminated unexpectedly. "; + break; + } _eventStream.Publish(new Info( - recipient.Path.ToString(), - recipient.GetType(), - $"Message [{messageType}] {origin} to {recipient.Path} was not delivered. [{_count.ToString()}] dead letters encountered{doneMsg}. " + - $"If this is not an expected behavior then {recipient.Path} may have terminated unexpectedly. " + + d.Recipient.Path.ToString(), + d.Recipient.GetType(), + logMessage + "This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " + "and 'akka.log-dead-letters-during-shutdown'.")); } + private bool IsReal(IActorRef snd) + { + return !ReferenceEquals(snd, ActorRefs.NoSender) && !ReferenceEquals(snd, Context.System.DeadLetters) && !(snd is DeadLetterActorRef); + } + + private bool IsWrappedSuppressed(AllDeadLetters d) + { + return d is IWrappedMessage w && w.Message is IDeadLetterSuppression; + } + /// /// This class represents the latest date or time by which an operation should be completed. /// @@ -210,9 +255,9 @@ private void LogDeadLetter(object message, IActorRef snd, IActorRef recipient, s public TimeSpan TimeLeft { get { return When - DateTime.UtcNow; } } #region Overrides - + /// - public override bool Equals(object obj) => + public override bool Equals(object obj) => obj is Deadline deadline && Equals(deadline); /// diff --git a/src/core/Akka/Event/UnhandledMessage.cs b/src/core/Akka/Event/UnhandledMessage.cs index 72cddf1cf34..819f3d11352 100644 --- a/src/core/Akka/Event/UnhandledMessage.cs +++ b/src/core/Akka/Event/UnhandledMessage.cs @@ -10,9 +10,9 @@ namespace Akka.Event { /// - /// This class represents a message that was not handled by the recipient. + /// This message is published to the EventStream whenever an Actor receives a message it doesn't understand /// - public sealed class UnhandledMessage + public sealed class UnhandledMessage : AllDeadLetters, IWrappedMessage { /// /// Initializes a new instance of the class. @@ -21,25 +21,8 @@ public sealed class UnhandledMessage /// The actor that sent the message. /// The actor that was to receive the message. public UnhandledMessage(object message, IActorRef sender, IActorRef recipient) + : base(message, sender, recipient) { - Message = message; - Sender = sender; - Recipient = recipient; } - - /// - /// The original message that could not be handled. - /// - public object Message { get; private set; } - - /// - /// The actor that sent the message. - /// - public IActorRef Sender { get; private set; } - - /// - /// The actor that was to receive the message. - /// - public IActorRef Recipient { get; private set; } } } diff --git a/src/core/Akka/Routing/ConsistentHashRouter.cs b/src/core/Akka/Routing/ConsistentHashRouter.cs index 53b399e7104..5b201cfdb97 100644 --- a/src/core/Akka/Routing/ConsistentHashRouter.cs +++ b/src/core/Akka/Routing/ConsistentHashRouter.cs @@ -47,7 +47,7 @@ public interface IConsistentHashable /// This class represents a that can be wrapped around a message in order to make /// it hashable for use with or routers. /// - public sealed class ConsistentHashableEnvelope : RouterEnvelope, IConsistentHashable + public sealed class ConsistentHashableEnvelope : RouterEnvelope, IConsistentHashable, IWrappedMessage { /// /// Initializes a new instance of the class.