Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustered routers now load correct default number of routees #2310

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ Target "MultiNodeTests" <| fun _ ->
|> append assembly
|> append "-Dmultinode.enable-filesink=on"
|> append (sprintf "-Dmultinode.output-directory=\"%s\"" testOutput)
|> appendIfNotNullOrEmpty spec "-Dmultinode.test-spec="
|> appendIfNotNullOrEmpty spec "-Dmultinode.spec="
Copy link
Member Author

@Aaronontheweb Aaronontheweb Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allows the ability for the MNTR to correctly run a single spec from the FAKE commandline, i.e.

 ./build.cmd multinodetests spec=ClusterBroadcastGroup

|> toText

let result = ExecProcess(fun info ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ namespace Akka.Actor
}
public class Deployer
{
protected readonly Akka.Configuration.Config Default;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be here as an instance field? It looks repetitive - maybe place it somewhere else or use as a static?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a private field before. Doesn't need to be static since there's only ever 1 instance per ActorSystem, but multiple actor systems could have different values for this field depending on which plugins they loaded (although it looks like only Akka core and Akka.Cluster really use it at the moment.)

public Deployer(Akka.Actor.Settings settings) { }
public Akka.Actor.Deploy Lookup(Akka.Actor.ActorPath path) { }
public Akka.Actor.Deploy Lookup(System.Collections.Generic.IEnumerable<string> path) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
<Compile Include="RestartNode2Spec.cs" />
<Compile Include="RestartNode3Spec.cs" />
<Compile Include="RestartNodeSpec.cs" />
<Compile Include="Routing\ClusterBroadcastRouter2266BugfixSpec.cs" />
<Compile Include="Routing\ClusterConsistentHashingGroupSpec.cs" />
<Compile Include="Routing\ClusterConsistentHashingRouterSpec.cs" />
<Compile Include="Routing\ClusterRoundRobinSpec.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterBroadcastRouter2266BugfixSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.Remote.TestKit;
using Akka.Routing;
using FluentAssertions;

namespace Akka.Cluster.Tests.MultiNode.Routing
{
public class ClusterBroadcastGroupSpecConfig : MultiNodeConfig
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multinode spec from @mrrd, based on the round-robin group clustered router MNTR spec.

Verifies that the router has the correct number of routees and that ALL of them receive each message.

{
internal interface IRouteeType { }
internal class PoolRoutee : IRouteeType { }
internal class GroupRoutee : IRouteeType { }

internal class Reply
{
public Reply(IRouteeType routeeType, IActorRef actorRef)
{
RouteeType = routeeType;
ActorRef = actorRef;
}

public IRouteeType RouteeType { get; }

public IActorRef ActorRef { get; }
}

internal class SomeActor : ReceiveActor
{
private readonly IRouteeType _routeeType;

public SomeActor() : this(new PoolRoutee())
{
}

public SomeActor(IRouteeType routeeType)
{
_routeeType = routeeType;
Receive<string>(s => s == "hit", s =>
{
Sender.Tell(new Reply(_routeeType, Self));
});
}
}

public RoleName First { get; }
public RoleName Second { get; }

public ClusterBroadcastGroupSpecConfig()
{
First = Role("first");
Second = Role("second");

CommonConfig = MultiNodeLoggingConfig.LoggingConfig
.WithFallback(DebugConfig(true))
.WithFallback(ConfigurationFactory.ParseString(@"
akka.actor.deployment {
/router1 {
router = broadcast-group
routees.paths = [""/user/myserviceA""]
cluster {
enabled = on
allow-local-routees = on
}
}
}"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

NodeConfig(new List<RoleName> { First }, new List<Config> { ConfigurationFactory.ParseString(@"akka.cluster.roles = [""a"", ""b""]") });
NodeConfig(new List<RoleName> { Second }, new List<Config> { ConfigurationFactory.ParseString(@"akka.cluster.roles = [""a""]") });

TestTransport = true;
}
}

public class ClusterBroadcastMultiNode1 : ClusterBroadcastGroupSpec { }
public class ClusterBroadcastMultiNode2 : ClusterBroadcastGroupSpec { }

/// <summary>
/// Used to verify that https://github.com/akkadotnet/akka.net/issues/2266 is reproducible and can be fixed
/// </summary>
public abstract class ClusterBroadcastGroupSpec : MultiNodeClusterSpec
{
private readonly ClusterBroadcastGroupSpecConfig _config;
private readonly Lazy<IActorRef> _router;

protected ClusterBroadcastGroupSpec() : this(new ClusterBroadcastGroupSpecConfig())
{
}

protected ClusterBroadcastGroupSpec(ClusterBroadcastGroupSpecConfig config) : base(config)
{
_config = config;

_router = new Lazy<IActorRef>(() => Sys.ActorOf(FromConfig.Instance.Props(), "router1"));
}

private IEnumerable<Routee> CurrentRoutees(IActorRef router)
{
var routerAsk = router.Ask<Routees>(new GetRoutees(), GetTimeoutOrDefault(null));
return routerAsk.Result.Members;
}

private Address FullAddress(IActorRef actorRef)
{
if (string.IsNullOrEmpty(actorRef.Path.Address.Host) || !actorRef.Path.Address.Port.HasValue)
return Cluster.SelfAddress;
return actorRef.Path.Address;
}

private Dictionary<Address, int> ReceiveReplays(ClusterBroadcastGroupSpecConfig.IRouteeType routeeType, int expectedReplies)
{
var zero = Roles.Select(GetAddress).ToDictionary(c => c, c => 0);
var replays = ReceiveWhile(5.Seconds(), msg =>
{
var routee = msg as ClusterBroadcastGroupSpecConfig.Reply;
if (routee != null && routee.RouteeType.GetType() == routeeType.GetType())
return FullAddress(routee.ActorRef);
return null;
}, expectedReplies).Aggregate(zero, (replyMap, address) =>
{
replyMap[address]++;
return replyMap;
});

return replays;
}

[MultiNodeFact]
public void ClusterBroadcastGroup()
{
A_cluster_router_with_a_BroadcastGroup_router_must_start_cluster_with_2_nodes();
A_cluster_router_with_a_BroadcastGroup_router_must_lookup_routees_on_the_member_nodes_in_the_cluster();
}

private void A_cluster_router_with_a_BroadcastGroup_router_must_start_cluster_with_2_nodes()
{
Log.Info("Waiting for cluster up");

AwaitClusterUp(_config.First, _config.Second);

RunOn(() =>
{
Log.Info("first, roles: " + Cluster.SelfRoles);
}, _config.First);

RunOn(() =>
{
Log.Info("second, roles: " + Cluster.SelfRoles);
}, _config.Second);

Log.Info("Cluster Up");

EnterBarrier("after-1");
}

private void A_cluster_router_with_a_BroadcastGroup_router_must_lookup_routees_on_the_member_nodes_in_the_cluster()
{
// cluster consists of first and second
Sys.ActorOf(Props.Create(() => new ClusterBroadcastGroupSpecConfig.SomeActor(new ClusterBroadcastGroupSpecConfig.GroupRoutee())), "myserviceA");
EnterBarrier("myservice-started");

RunOn(() =>
{
// 2 nodes, 1 routees on each node
Within(10.Seconds(), () =>
{
AwaitAssert(() => CurrentRoutees(_router.Value).Count().Should().Be(2)); //only seems to pass with a single routee should be 2
});

var routeeCount = CurrentRoutees(_router.Value).Count();
var iterationCount = 10;
for (int i = 0; i < iterationCount; i++)
{
_router.Value.Tell("hit");
}

var replays = ReceiveReplays(new ClusterBroadcastGroupSpecConfig.GroupRoutee(), iterationCount*routeeCount);

replays[GetAddress(_config.First)].Should().Be(10);
replays[GetAddress(_config.Second)].Should().Be(10);
replays.Values.Sum().Should().Be(iterationCount*routeeCount);
}, _config.First);

EnterBarrier("after-2");
}
}
}

29 changes: 29 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterDeployerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public class ClusterDeployerSpec : AkkaSpec
cluster.allow-local-routees = off
cluster.use-role = backend
}
/user/service3 {
dispatcher = mydispatcher
mailbox = mymailbox
router = broadcast-group
routees.paths = [""/user/myservice""]
cluster.enabled = on
cluster.allow-local-routees = off
cluster.use-role = backend
}
}
akka.remote.helios.tcp.port = 0");

Expand Down Expand Up @@ -85,6 +94,26 @@ public void RemoteDeployer_must_be_able_to_parse_akka_actor_deployment_with_spec
deployment.Dispatcher.ShouldBe("mydispatcher");
}

[Fact]
public void BugFix2266RemoteDeployer_must_be_able_to_parse_broadcast_group_cluster_router_with_default_nr_of_routees_routees()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reproduction spec which verifies that we weren't parsing the correct number of routees originally: https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Cluster/Configuration/Cluster.conf#L209

{
var service = "/user/service3";
var deployment = Sys.AsInstanceOf<ActorSystemImpl>().Provider.Deployer.Lookup(service.Split('/').Drop(1));
deployment.Should().NotBeNull();

deployment.Path.ShouldBe(service);
deployment.RouterConfig.GetType().ShouldBe(typeof(ClusterRouterGroup));
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Local.GetType().ShouldBe(typeof(BroadcastGroup));
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Local.AsInstanceOf<BroadcastGroup>().Paths.ShouldBe(new[] { "/user/myservice" });
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.TotalInstances.ShouldBe(10000);
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.AllowLocalRoutees.ShouldBe(false);
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.UseRole.ShouldBe("backend");
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.AsInstanceOf<ClusterRouterGroupSettings>().RouteesPaths.ShouldBe(new[] { "/user/myservice" });
deployment.Scope.ShouldBe(ClusterScope.Instance);
deployment.Mailbox.ShouldBe("mymailbox");
deployment.Dispatcher.ShouldBe("mydispatcher");
}

//todo: implement "have correct router mappings" test for adaptive load-balancing routers (not yet implemented)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public override Deploy ParseConfig(string key, Config config)
&& !config.HasPath("nr-of-instances"))
{
var maxTotalNrOfInstances = config
.WithFallback(ClusterConfigFactory.Default())
.WithFallback(Default)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes the bug reported in #2266 - correctly pulls in the akka.actor.deplyoment.default section and has us search for the default cluster.max-total-nr-of-instances inside of there, which fixes the issue.

.GetInt("cluster.max-total-nr-of-instances");
config2 = ConfigurationFactory.ParseString("nr-of-instances=" + maxTotalNrOfInstances)
.WithFallback(config);
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Cluster/Routing/ClusterRoutingConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ protected ClusterRouterActor(ClusterRouterSettingsBase settings)
}

Cluster = Cluster.Get(Context.System);
Nodes = ImmutableSortedSet.Create(Member.AddressOrdering, Cluster.ReadView.Members.Where(IsAvailable).Select(x => x.Address).ToArray());
Nodes = ImmutableSortedSet.Create(Member.AddressOrdering,
Cluster.ReadView.Members.Where(IsAvailable).Select(x => x.Address).ToArray());
}

public ClusterRouterSettingsBase Settings { get; protected set; }
Expand Down
9 changes: 6 additions & 3 deletions src/core/Akka/Actor/Deployer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@

namespace Akka.Actor
{
/// <summary>
/// Used to configure and deploy actors.
/// </summary>
public class Deployer
{
private readonly Config _default;
protected readonly Config Default;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to make the Default section visible to child classes.

private readonly Settings _settings;
private readonly AtomicReference<WildcardTree<Deploy>> _deployments = new AtomicReference<WildcardTree<Deploy>>(new WildcardTree<Deploy>());

public Deployer(Settings settings)
{
_settings = settings;
var config = settings.Config.GetConfig("akka.actor.deployment");
_default = config.GetConfig("default");
Default = config.GetConfig("default");

var rootObj = config.Root.GetObject();
if (rootObj == null) return;
Expand Down Expand Up @@ -90,7 +93,7 @@ public void SetDeploy(Deploy deploy)

public virtual Deploy ParseConfig(string key, Config config)
{
var deployment = config.WithFallback(_default);
var deployment = config.WithFallback(Default);
var routerType = deployment.GetString("router");
var router = CreateRouterConfig(routerType, deployment);
var dispatcher = deployment.GetString("dispatcher");
Expand Down
3 changes: 2 additions & 1 deletion src/examples/Chat/ChatServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ static void Main(string[] args)
applied-adapters = []
transport-protocol = tcp
port = 8081
hostname = localhost
hostname = 0.0.0.0
public-hostname = localhost
}
}
}
Expand Down