From 6030ac1dfe48827bf40a8c7f5a5da87674ce75cb Mon Sep 17 00:00:00 2001 From: Joshua Garnett Date: Sat, 10 Feb 2018 16:20:20 -0500 Subject: [PATCH 1/4] Fixing premature pruning of Topics The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket. This can cause problems if a TopicActor is re-suscribed to before being stopped. The Subscribe message only checks Context.Child, but does not check if the bucket is still valid. So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor. I've also switched from null to ActorRefs.Nobody. Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception. --- .../PublishSubscribe/DistributedPubSubMediator.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 949d73f0d3b..d3801e02fe6 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -223,7 +223,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && valueHolder.Ref != null) { Context.Unwatch(valueHolder.Ref); - PutToRegistry(remove.Path, null); + PutToRegistry(remove.Path, ActorRefs.Nobody); } } }); @@ -329,7 +329,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref)) - PutToRegistry(key, null); // remove + PutToRegistry(key, ActorRefs.Nobody); // remove _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewTopicActor(terminated.ActorRef.Path.Name)); }); @@ -549,7 +549,7 @@ private void HandlePrune() var bucket = entry.Value; var oldRemoved = bucket.Content - .Where(kv => (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds) + .Where(kv => Equals(kv.Value.Ref, ActorRefs.Nobody) && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds) .Select(kv => kv.Key); if (oldRemoved.Any()) From f9db75ef1daed46b3ebc2a2d0dc8620c8e591f72 Mon Sep 17 00:00:00 2001 From: Joshua Garnett Date: Sat, 10 Feb 2018 18:19:18 -0500 Subject: [PATCH 2/4] Fixing a few Ref comparisons --- .../PublishSubscribe/DistributedPubSubMediator.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index d3801e02fe6..6005115ceb4 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -220,7 +220,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) { - if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && valueHolder.Ref != null) + if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !Equals(valueHolder.Ref, ActorRefs.Nobody)) { Context.Unwatch(valueHolder.Ref); PutToRegistry(remove.Path, ActorRefs.Nobody); @@ -371,7 +371,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(_ => { /* ignore */ }); Receive(_ => { - var count = _registry.Sum(entry => entry.Value.Content.Count(kv => kv.Value.Ref != null)); + var count = _registry.Sum(entry => entry.Value.Content.Count(kv => !Equals(kv.Value.Ref, ActorRefs.Nobody))); Sender.Tell(count); }); Receive(_ => From 112fe45c811dd51ba33557f0812a76ac60d5f4ad Mon Sep 17 00:00:00 2001 From: Joshua Garnett Date: Sun, 11 Feb 2018 13:33:47 -0500 Subject: [PATCH 3/4] Switching to IsNobody() extension method --- .../PublishSubscribe/DistributedPubSubMediator.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 6005115ceb4..7ed3dbd428e 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -220,7 +220,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) { - if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !Equals(valueHolder.Ref, ActorRefs.Nobody)) + if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !valueHolder.Ref.IsNobody()) { Context.Unwatch(valueHolder.Ref); PutToRegistry(remove.Path, ActorRefs.Nobody); @@ -371,7 +371,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) Receive(_ => { /* ignore */ }); Receive(_ => { - var count = _registry.Sum(entry => entry.Value.Content.Count(kv => !Equals(kv.Value.Ref, ActorRefs.Nobody))); + var count = _registry.Sum(entry => entry.Value.Content.Count(kv => !kv.Value.Ref.IsNobody())); Sender.Tell(count); }); Receive(_ => @@ -488,7 +488,7 @@ IEnumerable Refs() if (!(allButSelf && address == _cluster.SelfAddress) && bucket.Content.TryGetValue(path, out var valueHolder)) { - if (valueHolder != null && !Equals(valueHolder.Ref, ActorRefs.Nobody)) + if (valueHolder != null && !valueHolder.Ref.IsNobody()) yield return valueHolder.Ref; } } @@ -549,7 +549,7 @@ private void HandlePrune() var bucket = entry.Value; var oldRemoved = bucket.Content - .Where(kv => Equals(kv.Value.Ref, ActorRefs.Nobody) && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds) + .Where(kv => kv.Value.Ref.IsNobody() && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds) .Select(kv => kv.Key); if (oldRemoved.Any()) From 1b2fed3de6aa420128162e332426dba8027abc3e Mon Sep 17 00:00:00 2001 From: Joshua Garnett Date: Wed, 14 Feb 2018 16:48:57 -0800 Subject: [PATCH 4/4] Swapping back to null to see if that is causing some problems --- .../PublishSubscribe/DistributedPubSubMediator.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 7ed3dbd428e..ea3d7a499c9 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -223,7 +223,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !valueHolder.Ref.IsNobody()) { Context.Unwatch(valueHolder.Ref); - PutToRegistry(remove.Path, ActorRefs.Nobody); + PutToRegistry(remove.Path, null); } } }); @@ -329,7 +329,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket)) if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref)) - PutToRegistry(key, ActorRefs.Nobody); // remove + PutToRegistry(key, null); // remove _buffer.RecreateAndForwardMessagesIfNeeded(key, () => NewTopicActor(terminated.ActorRef.Path.Name)); });