Skip to content

Commit

Permalink
Fixing premature pruning of Topics (#3322)
Browse files Browse the repository at this point in the history
* Fixing premature pruning of Topics

The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket.  This can cause problems if a TopicActor is re-suscribed to before being stopped.  The Subscribe message only checks Context.Child, but does not check if the bucket is still valid.  So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor.

I've also switched from null to ActorRefs.Nobody.  Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception.

* Switching to IsNobody() extension method
  • Loading branch information
joshgarnett authored and Aaronontheweb committed Feb 19, 2018
1 parent 3661b52 commit 4af1829
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
{
if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && valueHolder.Ref != null)
if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !valueHolder.Ref.IsNobody())
{
Context.Unwatch(valueHolder.Ref);
PutToRegistry(remove.Path, null);
Expand Down Expand Up @@ -371,7 +371,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
Receive<ClusterEvent.IMemberEvent>(_ => { /* ignore */ });
Receive<Count>(_ =>
{
var count = _registry.Sum(entry => entry.Value.Content.Count(kv => kv.Value.Ref != null));
var count = _registry.Sum(entry => entry.Value.Content.Count(kv => !kv.Value.Ref.IsNobody()));
Sender.Tell(count);
});
Receive<DeltaCount>(_ =>
Expand Down Expand Up @@ -488,7 +488,7 @@ IEnumerable<IActorRef> Refs()

if (!(allButSelf && address == _cluster.SelfAddress) && bucket.Content.TryGetValue(path, out var valueHolder))
{
if (valueHolder != null && !Equals(valueHolder.Ref, ActorRefs.Nobody))
if (valueHolder != null && !valueHolder.Ref.IsNobody())
yield return valueHolder.Ref;
}
}
Expand Down Expand Up @@ -549,7 +549,7 @@ private void HandlePrune()
var bucket = entry.Value;

var oldRemoved = bucket.Content
.Where(kv => (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds)
.Where(kv => kv.Value.Ref.IsNobody() && (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds)
.Select(kv => kv.Key);

if (oldRemoved.Any())
Expand Down

0 comments on commit 4af1829

Please sign in to comment.