Skip to content

Commit

Permalink
Fixed IActorRef leak inside EventStream (#5720)
Browse files Browse the repository at this point in the history
* reproduced #5717

Reproduced `IActorRef` leak inside the `EventStream`

* cleaned up the `EventBusUnsubscriber`

* close #5719 - cleaned up `EventStream` subscription management

* added API approval

For `Obsolete` attribute.

* need to capture more data on why failures happen

* harden bugfix5717specs
  • Loading branch information
Aaronontheweb authored Mar 15, 2022
1 parent d055f46 commit b6214ef
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3044,6 +3044,7 @@ namespace Akka.Event
public class EventStream : Akka.Event.LoggingBus
{
public EventStream(bool debug) { }
[System.ObsoleteAttribute("Should be removed in 1.5")]
public bool InitUnsubscriber(Akka.Actor.IActorRef unsubscriber, Akka.Actor.ActorSystem system) { }
public void StartUnsubscriber(Akka.Actor.Internal.ActorSystemImpl system) { }
public override bool Subscribe(Akka.Actor.IActorRef subscriber, System.Type channel) { }
Expand Down
66 changes: 66 additions & 0 deletions src/core/Akka.Tests/Event/Bugfix5717Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// //-----------------------------------------------------------------------
// // <copyright file="Bugfix5717Specs.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Event
{
public class Bugfix5717Specs : AkkaSpec
{
public Bugfix5717Specs(ITestOutputHelper output) : base(Config.Empty, output){}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/5717
/// </summary>
[Fact]
public async Task Should_unsubscribe_from_all_topics_on_Terminate()
{
var es = Sys.EventStream;
var tm1 = 1;
var tm2 = "FOO";
var a1 = CreateTestProbe();
var a2 = CreateTestProbe();

es.Subscribe(a1.Ref, typeof(int));
es.Subscribe(a2.Ref, typeof(int));
es.Subscribe(a2.Ref, typeof(string));
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg(tm1);
a2.ExpectMsg(tm1);
a2.ExpectMsg(tm2);

// kill second test probe
Watch(a2);
Sys.Stop(a2);
ExpectTerminated(a2);

/*
* It's possible that the `Terminate` message may not have been processed by the
* Unsubscriber yet, so we want to try this operation more than once to see if it
* eventually executes the unsubscribe on the EventStream.
*
* If it still fails after multiple attempts, the issue is that the unsub was never
* executed in the first place.
*/
await AwaitAssertAsync(async () =>
{
await EventFilter.DeadLetter().ExpectAsync(0, () =>
{
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg(tm1);
});
}, interval:TimeSpan.FromSeconds(250));
}
}
}
142 changes: 41 additions & 101 deletions src/core/Akka/Event/EventBusUnsubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Annotations;
using Akka.Dispatch;
using Akka.Util.Internal;

namespace Akka.Event
{
Expand All @@ -26,7 +23,7 @@ namespace Akka.Event
/// watching a few actors too much - we opt for the 2nd choice here.
/// </summary>
[InternalApi]
class EventStreamUnsubscriber : ActorBase
internal class EventStreamUnsubscriber : ActorBase
{
private readonly EventStream _eventStream;
private readonly bool _debug;
Expand All @@ -45,140 +42,83 @@ public EventStreamUnsubscriber(EventStream eventStream, ActorSystem system, bool
_debug = debug;

}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>

protected override bool Receive(object message)
{
return message.Match().With<Register>(register =>
switch (message)
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor)));
Context.Watch(register.Actor);
}).With<UnregisterIfNoMoreSubscribedChannels>(unregister =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unwatching {0} since has no subscriptions", unregister.Actor)));
Context.Unwatch(unregister.Actor);
}).With<Terminated>(terminated =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream )));
_eventStream.Unsubscribe(terminated.Actor);
})
.WasHandled;
case Register register:
{
if (_debug)
_eventStream.Publish(new Debug(GetType().Name, GetType(),
$"watching {register.Actor} in order to unsubscribe from EventStream when it terminates"));
Context.Watch(register.Actor);
break;
}
case UnregisterIfNoMoreSubscribedChannels unregister:
{
if (_debug)
_eventStream.Publish(new Debug(GetType().Name, GetType(),
$"unwatching {unregister.Actor} since has no subscriptions"));
Context.Unwatch(unregister.Actor);
break;
}
case Terminated terminated:
{
if (_debug)
_eventStream.Publish(new Debug(GetType().Name, GetType(),
$"unsubscribe {terminated.ActorRef} from {_eventStream}, because it was terminated"));
_eventStream.Unsubscribe(terminated.ActorRef);
break;
}
default:
return false;
}

return true;
}

/// <summary>
/// TBD
/// </summary>
protected override void PreStart()
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
_eventStream.Publish(new Debug(GetType().Name, GetType(),
string.Format("registering unsubscriber with {0}", _eventStream)));
_eventStream.InitUnsubscriber(Self, _system);
}

/// <summary>
/// TBD
/// INTERNAL API
///
/// Registers a new subscriber to be death-watched and automatically unsubscribed.
/// </summary>
internal class Register
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public Register(IActorRef actor)
{
Actor = actor;
}

/// <summary>
/// TBD
/// </summary>
public IActorRef Actor { get; private set; }
}


/// <summary>
/// TBD
/// </summary>
internal class Terminated
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public Terminated(IActorRef actor)
{
Actor = actor;
}

/// <summary>
/// TBD
/// The actor we're going to deathwatch and automatically unsubscribe
/// </summary>
public IActorRef Actor { get; private set; }
}

/// <summary>
/// TBD
/// INTERNAL API
///
/// Unsubscribes an actor that is no longer subscribed and does not need to be death-watched any longer.
/// </summary>
internal class UnregisterIfNoMoreSubscribedChannels
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public UnregisterIfNoMoreSubscribedChannels(IActorRef actor)
{
Actor = actor;
}

/// <summary>
/// TBD
/// The actor we're no longer going to death watch.
/// </summary>
public IActorRef Actor { get; private set; }
}
}



/// <summary>
/// Provides factory for Akka.Event.EventStreamUnsubscriber actors with unique names.
/// This is needed if someone spins up more EventStreams using the same ActorSystem,
/// each stream gets it's own unsubscriber.
/// </summary>
class EventStreamUnsubscribersProvider
{
private readonly AtomicCounter _unsubscribersCounter = new AtomicCounter(0);
private static readonly EventStreamUnsubscribersProvider _instance = new EventStreamUnsubscribersProvider();


/// <summary>
/// TBD
/// </summary>
public static EventStreamUnsubscribersProvider Instance
{
get { return _instance; }
}

/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
/// <param name="eventStream">TBD</param>
/// <param name="debug">TBD</param>
public void Start(ActorSystemImpl system, EventStream eventStream, bool debug)
{
system.SystemActorOf(Props.Create<EventStreamUnsubscriber>(eventStream, system, debug).WithDispatcher(Dispatchers.InternalDispatcherId),
string.Format("EventStreamUnsubscriber-{0}", _unsubscribersCounter.IncrementAndGet()));
}
}
}
Loading

0 comments on commit b6214ef

Please sign in to comment.