Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DeadLetterSuppression for ActorSelection and other deadletter enhancements #4889

Merged
merged 1 commit into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public MessageExtractor(int maxNumberOfShards)
/// and have the message types themselves carry identifiers.
/// </para>
/// </summary>
public sealed class ShardingEnvelope
public sealed class ShardingEnvelope: IWrappedMessage
{
public string EntityId { get; }
public object Message { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public override string ToString()
/// TBD
/// </summary>
[Serializable]
public sealed class Publish : IDistributedPubSubMessage, IEquatable<Publish>
public sealed class Publish : IDistributedPubSubMessage, IEquatable<Publish>, IWrappedMessage
{
/// <summary>
/// TBD
Expand Down Expand Up @@ -420,7 +420,7 @@ public override string ToString()
/// TBD
/// </summary>
[Serializable]
public sealed class Send : IDistributedPubSubMessage, IEquatable<Send>
public sealed class Send : IDistributedPubSubMessage, IEquatable<Send>, IWrappedMessage
{
/// <summary>
/// TBD
Expand Down Expand Up @@ -487,7 +487,7 @@ public override string ToString()
/// TBD
/// </summary>
[Serializable]
public sealed class SendToAll : IDistributedPubSubMessage, IEquatable<SendToAll>
public sealed class SendToAll : IDistributedPubSubMessage, IEquatable<SendToAll>, IWrappedMessage
{
/// <summary>
/// TBD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ namespace Akka.Cluster.Sharding
public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithRole(string role) { }
public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithShardingSettings(Akka.Cluster.Sharding.ClusterShardingSettings shardingSettings) { }
}
public sealed class ShardingEnvelope
public sealed class ShardingEnvelope : Akka.Actor.IWrappedMessage
{
public ShardingEnvelope(string entityId, object message) { }
public string EntityId { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public static Akka.Cluster.Tools.PublishSubscribe.GetTopics Instance { get; }
}
public interface IDistributedPubSubMessage { }
public sealed class Publish : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Publish>
public sealed class Publish : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Publish>
{
public Publish(string topic, object message, bool sendOneMessageToEachGroup = False) { }
public object Message { get; }
Expand Down Expand Up @@ -258,7 +258,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class Send : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Send>
public sealed class Send : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Send>
{
public Send(string path, object message, bool localAffinity = False) { }
public bool LocalAffinity { get; }
Expand All @@ -269,7 +269,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class SendToAll : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.SendToAll>
public sealed class SendToAll : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.SendToAll>
{
public SendToAll(string path, object message, bool excludeSelf = False) { }
public bool ExcludeSelf { get; }
Expand Down
25 changes: 18 additions & 7 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ namespace Akka.Actor
public void Tell(object message, Akka.Actor.IActorRef sender = null) { }
public override string ToString() { }
}
public class ActorSelectionMessage : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful
public class ActorSelectionMessage : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Actor.IWrappedMessage
{
public ActorSelectionMessage(object message, Akka.Actor.SelectionPathElement[] elements, bool wildCardFanOut = False) { }
public Akka.Actor.SelectionPathElement[] Elements { get; }
Expand Down Expand Up @@ -1145,6 +1145,10 @@ namespace Akka.Actor
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWrappedMessage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

{
object Message { get; }
}
public sealed class Identify : Akka.Actor.IAutoReceivedMessage, Akka.Actor.INotInfluenceReceiveTimeout
{
public Identify(object messageId) { }
Expand Down Expand Up @@ -1797,6 +1801,10 @@ namespace Akka.Actor
protected void RunTask(System.Func<System.Threading.Tasks.Task> action) { }
}
public delegate void UntypedReceive(object message);
public class static WrappedMessage
{
public static object Unwrap(object message) { }
}
}
namespace Akka.Actor.Dsl
{
Expand Down Expand Up @@ -2879,7 +2887,7 @@ namespace Akka.Event
{
protected ActorEventBus() { }
}
public abstract class AllDeadLetters
public abstract class AllDeadLetters : Akka.Actor.IWrappedMessage
{
protected AllDeadLetters(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public object Message { get; }
Expand Down Expand Up @@ -2933,6 +2941,12 @@ namespace Akka.Event
protected virtual void Print(Akka.Event.LogEvent logEvent) { }
protected override bool Receive(object message) { }
}
public sealed class Dropped : Akka.Event.AllDeadLetters
{
public Dropped(object message, string reason, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public Dropped(object message, string reason, Akka.Actor.IActorRef recipient) { }
public string Reason { get; }
}
public class DummyClassForStringSources
{
public DummyClassForStringSources() { }
Expand Down Expand Up @@ -3155,12 +3169,9 @@ namespace Akka.Event
public TraceLogger() { }
protected override void OnReceive(object message) { }
}
public sealed class UnhandledMessage
public sealed class UnhandledMessage : Akka.Event.AllDeadLetters, Akka.Actor.IWrappedMessage
{
public UnhandledMessage(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public object Message { get; }
public Akka.Actor.IActorRef Recipient { get; }
public Akka.Actor.IActorRef Sender { get; }
}
public class Warning : Akka.Event.LogEvent
{
Expand Down Expand Up @@ -4088,7 +4099,7 @@ namespace Akka.Routing
public Akka.Util.ISurrogated FromSurrogate(Akka.Actor.ActorSystem system) { }
}
}
public sealed class ConsistentHashableEnvelope : Akka.Routing.RouterEnvelope, Akka.Routing.IConsistentHashable
public sealed class ConsistentHashableEnvelope : Akka.Routing.RouterEnvelope, Akka.Actor.IWrappedMessage, Akka.Routing.IConsistentHashable
{
public ConsistentHashableEnvelope(object message, object hashKey) { }
public object ConsistentHashKey { get; }
Expand Down
29 changes: 29 additions & 0 deletions src/core/Akka.Tests/Actor/DeadLetterSupressionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead
allDeadLetter.Recipient.Should().Be(deadActor);

allListener.ExpectNoMsg(200.Milliseconds());

// unwrap for ActorSelection
Sys.ActorSelection(deadActor.Path).Tell(new SuppressedMessage());
Sys.ActorSelection(deadActor.Path).Tell(new NormalMessage());

// the recipient ref isn't the same as deadActor here so only checking the message
deadLetter = deadListener.ExpectMsg<DeadLetter>();//
deadLetter.Message.Should().BeOfType<NormalMessage>();
suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>();
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


deadListener.ExpectNoMsg(200.Milliseconds());
suppressedListener.ExpectNoMsg(200.Milliseconds());
}

[Fact]
Expand Down Expand Up @@ -123,6 +136,22 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead
deadListener.ExpectNoMsg(TimeSpan.Zero);
suppressedListener.ExpectNoMsg(TimeSpan.Zero);
allListener.ExpectNoMsg(TimeSpan.Zero);

// unwrap for ActorSelection
Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new SuppressedMessage());
Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new NormalMessage());

deadLetter = deadListener.ExpectMsg<DeadLetter>();
deadLetter.Message.Should().BeOfType<NormalMessage>();
deadLetter.Sender.Should().Be(TestActor);
deadLetter.Recipient.Should().Be(Sys.DeadLetters);
suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>();
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
suppressedDeadLetter.Sender.Should().Be(TestActor);
suppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters);

deadListener.ExpectNoMsg(200.Milliseconds());
suppressedListener.ExpectNoMsg(200.Milliseconds());
}
}
}
68 changes: 58 additions & 10 deletions src/core/Akka.Tests/Actor/DeadLetterSuspensionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,54 @@
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Xunit;

namespace Akka.Tests.Actor
{
public class DeadLetterSuspensionSpec : AkkaSpec
{
private class Dropping : ActorBase
{
public static Props Props() => Akka.Actor.Props.Create(() => new Dropping());

protected override bool Receive(object message)
{
switch (message)
{
case int n:
Context.System.EventStream.Publish(new Dropped(n, "Don't like numbers", Self));
return true;
}
return false;
}
}

private class Unandled : ActorBase
{
public static Props Props() => Akka.Actor.Props.Create(() => new Unandled());

protected override bool Receive(object message)
{
switch (message)
{
case int n:
Unhandled(n);
return true;
}
return false;
}
}

private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.log-dead-letters = 3
akka.log-dead-letters = 4
akka.log-dead-letters-suspend-duration = 2s");

private readonly IActorRef _deadActor;
private readonly IActorRef _droppingActor;
private readonly IActorRef _unhandledActor;

public DeadLetterSuspensionSpec()
: base(Config)
Expand All @@ -29,10 +64,20 @@ public DeadLetterSuspensionSpec()
Watch(_deadActor);
_deadActor.Tell(PoisonPill.Instance);
ExpectTerminated(_deadActor);

_droppingActor = Sys.ActorOf(Dropping.Props(), "droppingActor");
_unhandledActor = Sys.ActorOf(Unandled.Props(), "unhandledActor");
}

private string ExpectedDeadLettersLogMessage(int count) =>
$"Message [{count.GetType().Name}] from {TestActor.Path} to {_deadActor.Path} was not delivered. [{count}] dead letters encountered";
$"Message [{count.GetType().Name}] from {TestActor} to {_deadActor} was not delivered. [{count}] dead letters encountered";

private string ExpectedDroppedLogMessage(int count) =>
$"Message [{count.GetType().Name}] to {_droppingActor} was dropped. Don't like numbers. [{count}] dead letters encountered";

private string ExpectedUnhandledLogMessage(int count) =>
$"Message [{count.GetType().Name}] from {TestActor} to {_unhandledActor} was unhandled. [{count}] dead letters encountered";


[Fact]
public void Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letters_and_then_re_enable()
Expand All @@ -42,28 +87,31 @@ public void Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letter
.Expect(1, () => _deadActor.Tell(1));

EventFilter
.Info(start: ExpectedDeadLettersLogMessage(2))
.Expect(1, () => _deadActor.Tell(2));
.Info(start: ExpectedDroppedLogMessage(2))
.Expect(1, () => _droppingActor.Tell(2));

EventFilter
.Info(start: ExpectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next")
.Expect(1, () => _deadActor.Tell(3));
.Info(start: ExpectedUnhandledLogMessage(3))
.Expect(1, () => _unhandledActor.Tell(3));

_deadActor.Tell(4);
EventFilter
.Info(start: ExpectedDeadLettersLogMessage(4) + ", no more dead letters will be logged in next")
.Expect(1, () => _deadActor.Tell(4));
_deadActor.Tell(5);
_droppingActor.Tell(6);

// let suspend-duration elapse
Thread.Sleep(2050);

// re-enabled
EventFilter
.Info(start: ExpectedDeadLettersLogMessage(6) + ", of which 2 were not logged")
.Expect(1, () => _deadActor.Tell(6));
.Info(start: ExpectedDeadLettersLogMessage(7) + ", of which 2 were not logged")
.Expect(1, () => _deadActor.Tell(7));

// reset count
EventFilter
.Info(start: ExpectedDeadLettersLogMessage(1))
.Expect(1, () => _deadActor.Tell(7));
.Expect(1, () => _deadActor.Tell(8));
}
}
}
1 change: 1 addition & 0 deletions src/core/Akka.Tests/Event/EventStreamSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ private static string GetDebugUnhandledMessagesConfig()
akka {
actor.serialize-messages = off
actor.debug.unhandled = on
log-dead-letters = off
stdout-loglevel = DEBUG
loglevel = DEBUG
loggers = [""%logger%""]
Expand Down
Loading