Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed IActorRef leak inside EventStream #5720

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3044,6 +3044,7 @@ namespace Akka.Event
public class EventStream : Akka.Event.LoggingBus
{
public EventStream(bool debug) { }
[System.ObsoleteAttribute("Should be removed in 1.5")]
public bool InitUnsubscriber(Akka.Actor.IActorRef unsubscriber, Akka.Actor.ActorSystem system) { }
public void StartUnsubscriber(Akka.Actor.Internal.ActorSystemImpl system) { }
public override bool Subscribe(Akka.Actor.IActorRef subscriber, System.Type channel) { }
Expand Down
66 changes: 66 additions & 0 deletions src/core/Akka.Tests/Event/Bugfix5717Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// //-----------------------------------------------------------------------
// // <copyright file="Bugfix5717Specs.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Event
{
public class Bugfix5717Specs : AkkaSpec
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reproduction for #5717

{
public Bugfix5717Specs(ITestOutputHelper output) : base(Config.Empty, output){}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/5717
/// </summary>
[Fact]
public async Task Should_unsubscribe_from_all_topics_on_Terminate()
{
var es = Sys.EventStream;
var tm1 = 1;
var tm2 = "FOO";
var a1 = CreateTestProbe();
var a2 = CreateTestProbe();

es.Subscribe(a1.Ref, typeof(int));
es.Subscribe(a2.Ref, typeof(int));
es.Subscribe(a2.Ref, typeof(string));
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg(tm1);
a2.ExpectMsg(tm1);
a2.ExpectMsg(tm2);

// kill second test probe
Watch(a2);
Sys.Stop(a2);
ExpectTerminated(a2);

/*
* It's possible that the `Terminate` message may not have been processed by the
* Unsubscriber yet, so we want to try this operation more than once to see if it
* eventually executes the unsubscribe on the EventStream.
*
* If it still fails after multiple attempts, the issue is that the unsub was never
* executed in the first place.
*/
await AwaitAssertAsync(async () =>
{
await EventFilter.DeadLetter().ExpectAsync(0, () =>
{
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg(tm1);
});
}, interval:TimeSpan.FromSeconds(250));
}
}
}
142 changes: 41 additions & 101 deletions src/core/Akka/Event/EventBusUnsubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Annotations;
using Akka.Dispatch;
using Akka.Util.Internal;

namespace Akka.Event
{
Expand All @@ -26,7 +23,7 @@ namespace Akka.Event
/// watching a few actors too much - we opt for the 2nd choice here.
/// </summary>
[InternalApi]
class EventStreamUnsubscriber : ActorBase
internal class EventStreamUnsubscriber : ActorBase
{
private readonly EventStream _eventStream;
private readonly bool _debug;
Expand All @@ -45,140 +42,83 @@ public EventStreamUnsubscriber(EventStream eventStream, ActorSystem system, bool
_debug = debug;

}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>

protected override bool Receive(object message)
{
return message.Match().With<Register>(register =>
switch (message)
{
if (_debug)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code before was very old and still used PatternMatch. Refactored to a switch statement.

_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor)));
Context.Watch(register.Actor);
}).With<UnregisterIfNoMoreSubscribedChannels>(unregister =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unwatching {0} since has no subscriptions", unregister.Actor)));
Context.Unwatch(unregister.Actor);
}).With<Terminated>(terminated =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream )));
_eventStream.Unsubscribe(terminated.Actor);
})
.WasHandled;
case Register register:
{
if (_debug)
_eventStream.Publish(new Debug(GetType().Name, GetType(),
$"watching {register.Actor} in order to unsubscribe from EventStream when it terminates"));
Context.Watch(register.Actor);
break;
}
case UnregisterIfNoMoreSubscribedChannels unregister:
{
if (_debug)
_eventStream.Publish(new Debug(GetType().Name, GetType(),
$"unwatching {unregister.Actor} since has no subscriptions"));
Context.Unwatch(unregister.Actor);
break;
}
case Terminated terminated:
{
if (_debug)
_eventStream.Publish(new Debug(GetType().Name, GetType(),
$"unsubscribe {terminated.ActorRef} from {_eventStream}, because it was terminated"));
_eventStream.Unsubscribe(terminated.ActorRef);
break;
}
default:
return false;
}

return true;
}

/// <summary>
/// TBD
/// </summary>
protected override void PreStart()
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
_eventStream.Publish(new Debug(GetType().Name, GetType(),
string.Format("registering unsubscriber with {0}", _eventStream)));
_eventStream.InitUnsubscriber(Self, _system);
}

/// <summary>
/// TBD
/// INTERNAL API
///
/// Registers a new subscriber to be death-watched and automatically unsubscribed.
/// </summary>
internal class Register
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public Register(IActorRef actor)
{
Actor = actor;
}

/// <summary>
/// TBD
/// </summary>
public IActorRef Actor { get; private set; }
}


/// <summary>
/// TBD
/// </summary>
internal class Terminated
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most probable cause of #5717 - we were handling our own Terminated message rather than the built-in one. This looks like a porting error that never was caught years ago.

{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public Terminated(IActorRef actor)
{
Actor = actor;
}

/// <summary>
/// TBD
/// The actor we're going to deathwatch and automatically unsubscribe
/// </summary>
public IActorRef Actor { get; private set; }
}

/// <summary>
/// TBD
/// INTERNAL API
///
/// Unsubscribes an actor that is no longer subscribed and does not need to be death-watched any longer.
/// </summary>
internal class UnregisterIfNoMoreSubscribedChannels
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public UnregisterIfNoMoreSubscribedChannels(IActorRef actor)
{
Actor = actor;
}

/// <summary>
/// TBD
/// The actor we're no longer going to death watch.
/// </summary>
public IActorRef Actor { get; private set; }
}
}



/// <summary>
/// Provides factory for Akka.Event.EventStreamUnsubscriber actors with unique names.
/// This is needed if someone spins up more EventStreams using the same ActorSystem,
/// each stream gets it's own unsubscriber.
/// </summary>
class EventStreamUnsubscribersProvider
{
private readonly AtomicCounter _unsubscribersCounter = new AtomicCounter(0);
private static readonly EventStreamUnsubscribersProvider _instance = new EventStreamUnsubscribersProvider();


/// <summary>
/// TBD
/// </summary>
public static EventStreamUnsubscribersProvider Instance
{
get { return _instance; }
}

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
/// <param name="eventStream">TBD</param>
/// <param name="debug">TBD</param>
public void Start(ActorSystemImpl system, EventStream eventStream, bool debug)
{
system.SystemActorOf(Props.Create<EventStreamUnsubscriber>(eventStream, system, debug).WithDispatcher(Dispatchers.InternalDispatcherId),
string.Format("EventStreamUnsubscriber-{0}", _unsubscribersCounter.IncrementAndGet()));
}
}
}
Loading