-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing premature pruning of Topics #3322
Conversation
The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket. This can cause problems if a TopicActor is re-suscribed to before being stopped. The Subscribe message only checks Context.Child, but does not check if the bucket is still valid. So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor. I've also switched from null to ActorRefs.Nobody. Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception.
My guess is whomever did the initial port misunderstood how Scala pattern matching works. From https://github.com/akka/akka/blob/master/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala#L835
|
Looks like we have a failure in the multi-node
I'll take a look at the source in a moment here and see what's up |
(Could be that the spec itself implements the same "misinterpretation" of the Scala code as the underlying source did) |
This appears to be the offending line of code: Lines 430 to 442 in 72d281d
Should see the number of publishers drop from 5 to 4 with this change, but that's no longer the case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of stylistic changes. Still taking a look at the code to see what can be causing the spec to fail...
@@ -220,10 +220,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) | |||
{ | |||
if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) | |||
{ | |||
if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && valueHolder.Ref != null) | |||
if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !Equals(valueHolder.Ref, ActorRefs.Nobody)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the IsNobody
extension method here IMHO. It'll perform a null check and this reference comparison to ActorRefs.Nobody
as well.
@@ -371,7 +371,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) | |||
Receive<ClusterEvent.IMemberEvent>(_ => { /* ignore */ }); | |||
Receive<Count>(_ => | |||
{ | |||
var count = _registry.Sum(entry => entry.Value.Content.Count(kv => kv.Value.Ref != null)); | |||
var count = _registry.Sum(entry => entry.Value.Content.Count(kv => !Equals(kv.Value.Ref, ActorRefs.Nobody))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsNobody
again
@@ -329,7 +329,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) | |||
|
|||
if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) | |||
if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref)) | |||
PutToRegistry(key, null); // remove | |||
PutToRegistry(key, ActorRefs.Nobody); // remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the change
Looks like the issue occurs earlier in the spec, when node 2 goes down as a result of a normal
Offending code: Lines 420 to 424 in 72d281d
|
On a whim, going to re-run this and see if the issue is a race condition or not. I don't think so though: it failed in both the .NET 4.6.1 and .NET Core 1.1 implementations. |
Node 3 failed in the same spot as node 2. So only node 1's count was accurate after the |
@joshgarnett sure looks like the issue was the |
Looks like the reason why the https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/Actor/ActorRef.cs#L593 Haven't written a test for it, but I'm pretty sure that calling |
Going to merge this to get the bugfix in; don't think the juice is worth the squeeze in trying to make the serialization / deserialization of |
* Fixing premature pruning of Topics The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket. This can cause problems if a TopicActor is re-suscribed to before being stopped. The Subscribe message only checks Context.Child, but does not check if the bucket is still valid. So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor. I've also switched from null to ActorRefs.Nobody. Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception. * Switching to IsNobody() extension method
The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket. This can cause problems if a TopicActor is re-suscribed to before being stopped. The Subscribe message only checks Context.Child, but does not check if the bucket is still valid. So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor.
I've also switched from null to ActorRefs.Nobody. Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception.