Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClusterSingletonManager should ignore FSM events during shutdown #3263

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
//-----------------------------------------------------------------------
// <copyright file="CoordinatedShutdownShardingSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Sharding.Tests
{
public class CoordinatedShutdownShardingSpec : AkkaSpec
{
private readonly ActorSystem _sys1;
private readonly ActorSystem _sys2;
private readonly ActorSystem _sys3;

private readonly IActorRef _region1;
private readonly IActorRef _region2;
private readonly IActorRef _region3;

private readonly TestProbe _probe1;
private readonly TestProbe _probe2;
private readonly TestProbe _probe3;

private static readonly Config SpecConfig;

private class EchoActor : ReceiveActor
{
public EchoActor()
{
ReceiveAny(_ => Sender.Tell(_));
}
}

private readonly ExtractEntityId _extractEntityId = message => Tuple.Create(message.ToString(), message);

private readonly ExtractShardId _extractShard = message => (message.GetHashCode() % 10).ToString();

static CoordinatedShutdownShardingSpec()
{
SpecConfig = ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0")
.WithFallback(ClusterSingletonManager.DefaultConfig()
.WithFallback(ClusterSharding.DefaultConfig()));
}

public CoordinatedShutdownShardingSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
{
_sys1 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
_sys3 = Sys;

var props = Props.Create(() => new EchoActor());
_region1 = ClusterSharding.Get(_sys1).Start("type1", props, ClusterShardingSettings.Create(_sys1),
_extractEntityId, _extractShard);
_region2 = ClusterSharding.Get(_sys2).Start("type1", props, ClusterShardingSettings.Create(_sys2),
_extractEntityId, _extractShard);
_region3 = ClusterSharding.Get(_sys3).Start("type1", props, ClusterShardingSettings.Create(_sys3),
_extractEntityId, _extractShard);


_probe1 = CreateTestProbe(_sys1);
_probe2 = CreateTestProbe(_sys2);
_probe3 = CreateTestProbe(_sys3);

CoordinatedShutdown.Get(_sys1).AddTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind", () =>
{
_probe1.Ref.Tell("CS-unbind-1");
return Task.FromResult(Done.Instance);
});

CoordinatedShutdown.Get(_sys2).AddTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind", () =>
{
_probe2.Ref.Tell("CS-unbind-2");
return Task.FromResult(Done.Instance);
});

CoordinatedShutdown.Get(_sys3).AddTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind", () =>
{
_probe3.Ref.Tell("CS-unbind-3");
return Task.FromResult(Done.Instance);
});
}

protected override void BeforeTermination()
{
Shutdown(_sys1);
Shutdown(_sys2);
}

/// <summary>
/// Using region 2 as it is not shutdown in either test.
/// </summary>
private void PingEntities()
{
_region2.Tell(1, _probe2.Ref);
_probe2.ExpectMsg<int>(10.Seconds()).Should().Be(1);
_region2.Tell(2, _probe2.Ref);
_probe2.ExpectMsg<int>(10.Seconds()).Should().Be(2);
_region2.Tell(3, _probe2.Ref);
_probe2.ExpectMsg<int>(10.Seconds()).Should().Be(3);
}

[Fact]
public void Sharding_and_CoordinatedShutdown_must_run_successfully()
{
InitCluster();
RunCoordinatedShutdownWhenLeaving();
RunCoordinatedShutdownWhenDowning();
}

private void InitCluster()
{
// FIXME this test should also work when coordinator is on the leaving sys1 node,
// but currently there seems to be a race between the CS and the ClusterSingleton observing OldestChanged
// and terminating coordinator singleton before the graceful sharding stop is done.

Cluster.Get(_sys2).Join(Cluster.Get(_sys2).SelfAddress); // coordinator will initially run on sys2
AwaitAssert(() => Cluster.Get(_sys2).SelfMember.Status.Should().Be(MemberStatus.Up));

Cluster.Get(_sys1).Join(Cluster.Get(_sys2).SelfAddress);
Within(10.Seconds(), () =>
{
AwaitAssert(() =>
{
Cluster.Get(_sys1).State.Members.Count.Should().Be(2);
Cluster.Get(_sys1).State.Members.All(x => x.Status == MemberStatus.Up).Should().BeTrue();
Cluster.Get(_sys2).State.Members.Count.Should().Be(2);
Cluster.Get(_sys2).State.Members.All(x => x.Status == MemberStatus.Up).Should().BeTrue();
});
});

Cluster.Get(_sys3).Join(Cluster.Get(_sys1).SelfAddress);
Within(10.Seconds(), () =>
{
AwaitAssert(() =>
{
Cluster.Get(_sys1).State.Members.Count.Should().Be(3);
Cluster.Get(_sys1).State.Members.All(x => x.Status == MemberStatus.Up).Should().BeTrue();
Cluster.Get(_sys2).State.Members.Count.Should().Be(3);
Cluster.Get(_sys2).State.Members.All(x => x.Status == MemberStatus.Up).Should().BeTrue();
Cluster.Get(_sys3).State.Members.Count.Should().Be(3);
Cluster.Get(_sys3).State.Members.All(x => x.Status == MemberStatus.Up).Should().BeTrue();
});
});

PingEntities();
}

private void RunCoordinatedShutdownWhenLeaving()
{
Cluster.Get(_sys3).Leave(Cluster.Get(_sys1).SelfAddress);
_probe1.ExpectMsg("CS-unbind-1");

Within(10.Seconds(), () =>
{
AwaitAssert(() =>
{
Cluster.Get(_sys2).State.Members.Count.Should().Be(2);
Cluster.Get(_sys3).State.Members.Count.Should().Be(2);
});
});

Within(10.Seconds(), () =>
{
AwaitAssert(() =>
{
Cluster.Get(_sys1).IsTerminated.Should().BeTrue();
_sys1.WhenTerminated.IsCompleted.Should().BeTrue();
});
});

PingEntities();
}

private void RunCoordinatedShutdownWhenDowning()
{
// coordinator is on Sys2
Cluster.Get(_sys2).Down(Cluster.Get(_sys3).SelfAddress);
_probe3.ExpectMsg("CS-unbind-3");

Within(10.Seconds(), () =>
{
AwaitAssert(() =>
{
Cluster.Get(_sys2).State.Members.Count.Should().Be(1);
});
});

Within(10.Seconds(), () =>
{
AwaitAssert(() =>
{
Cluster.Get(_sys3).IsTerminated.Should().BeTrue();
_sys3.WhenTerminated.IsCompleted.Should().BeTrue();
});
});

PingEntities();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ private void Remove(Member member)

private void SendFirstChange()
{
object change;
_changes = _changes.Dequeue(out change);
Context.Parent.Tell(change);
// don't send cluster change events if this node is shutting its self down, just wait for SelfExiting
if (!_cluster.IsTerminated)
{
object change;
_changes = _changes.Dequeue(out change);
Context.Parent.Tell(change);
}
}

/// <inheritdoc cref="ActorBase.PreStart"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace Akka.Cluster
public Akka.Remote.DefaultFailureDetectorRegistry<Akka.Actor.Address> FailureDetector { get; }
public bool IsTerminated { get; }
public Akka.Actor.Address SelfAddress { get; }
public Akka.Cluster.Member SelfMember { get; }
public System.Collections.Immutable.ImmutableHashSet<string> SelfRoles { get; }
public Akka.Cluster.UniqueAddress SelfUniqueAddress { get; }
public Akka.Cluster.ClusterSettings Settings { get; }
Expand Down
5 changes: 5 additions & 0 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ public ImmutableHashSet<string> SelfRoles
/// </summary>
public ClusterEvent.CurrentClusterState State { get { return _readView._state; } }

/// <summary>
/// Access to the current member info for this node.
/// </summary>
public Member SelfMember => _readView.Self;

private readonly AtomicBoolean _isTerminated = new AtomicBoolean(false);

/// <summary>
Expand Down