From 4af182952674a091c677fce685b866b0fd27e264 Mon Sep 17 00:00:00 2001 From: Joshua Garnett Date: Thu, 15 Feb 2018 03:52:11 -0800 Subject: [PATCH] Fixing premature pruning of Topics (#3322) * Fixing premature pruning of Topics The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket. This can cause problems if a TopicActor is re-suscribed to before being stopped. The Subscribe message only checks Context.Child, but does not check if the bucket is still valid. So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor. I've also switched from null to ActorRefs.Nobody. Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception. * Switching to IsNobody() extension method --- .../PublishSubscribe/DistributedPubSubMediator.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 949d73f0d3b..ea3d7a499c9 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -220,7 +220,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) { - if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && valueHolder.Ref != null) + if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !valueHolder.Ref.IsNobody()) { Context.Unwatch(valueHolder.Ref); PutToRegistry(remove.Path, null); @@ -371,7 +371,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(_ => { /* ignore */ }); Receive(_ => { - var count = _registry.Sum(entry => entry.Value.Content.Count(kv => kv.Value.Ref != null)); + var count = _registry.Sum(entry => entry.Value.Content.Count(kv => !kv.Value.Ref.IsNobody())); Sender.Tell(count); }); Receive(_ => @@ -488,7 +488,7 @@ IEnumerable Refs() if (!(allButSelf && address == _cluster.SelfAddress) && bucket.Content.TryGetValue(path, out var valueHolder)) { - if (valueHolder != null && !Equals(valueHolder.Ref, ActorRefs.Nobody)) + if (valueHolder != null && !valueHolder.Ref.IsNobody()) yield return valueHolder.Ref; } } @@ -549,7 +549,7 @@ private void HandlePrune() var bucket = entry.Value; var oldRemoved = bucket.Content - .Where(kv => (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds) + .Where(kv => kv.Value.Ref.IsNobody() && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds) .Select(kv => kv.Key); if (oldRemoved.Any())