Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharding update #3524

Merged
merged 8 commits into from
Jun 25, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected ClusterShardingFailureSpecConfig(string mode)
serialization-bindings {{
""System.Object"" = hyperion
}}
}}
}}
akka.loglevel = INFO
akka.actor.provider = cluster
akka.remote.log-remote-lifecycle-events = off
Expand Down Expand Up @@ -176,7 +176,7 @@ public Entity()
private readonly ClusterShardingFailureSpecConfig _config;

private readonly List<FileInfo> _storageLocations;

protected ClusterShardingFailureSpec(ClusterShardingFailureSpecConfig config, Type type)
: base(config, type)
{
Expand All @@ -194,7 +194,7 @@ protected ClusterShardingFailureSpec(ClusterShardingFailureSpecConfig config, Ty
}

protected bool IsDDataMode { get; }

protected override void AfterTermination()
{
base.AfterTermination();
Expand Down Expand Up @@ -364,6 +364,13 @@ public void ClusterSharding_with_flaky_journal_network_should_recover_after_jour

//Test the Shard passivate works during a journal failure
shard2.Tell(new Passivate(PoisonPill.Instance), entity21);

AwaitAssert(() =>
{
region.Tell(new Get("21"));
ExpectMsg<Value>(v => v.Id == "21" && v.N == 0, hint: "Passivating did not reset Value down to 0");
});

region.Tell(new Add("21", 1));

region.Tell(new Get("21"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,15 @@ private Stop()

public const int NumberOfShards = 12;
private int _count = 0;
private readonly string id;

public Counter()
public static Props Props(string id) => Actor.Props.Create(() => new Counter(id));

public static string ShardingTypeName => "Counter";

public Counter(string id)
{
this.id = id;
Context.SetReceiveTimeout(TimeSpan.FromMinutes(2));
}

Expand All @@ -233,7 +239,7 @@ protected override void PostStop()
Thread.Sleep(500);
}

public override string PersistenceId { get { return "Counter-" + Self.Path.Name; } }
public override string PersistenceId { get { return $"Counter.{ShardingTypeName}-{id}"; } }

protected override bool ReceiveRecover(object message)
{
Expand Down Expand Up @@ -277,36 +283,52 @@ private void UpdateState(CounterChanged e)

internal class QualifiedCounter : Counter
{
public static Props Props(string typeName)
public static Props Props(string typeName, string id)
{
return Actor.Props.Create(() => new QualifiedCounter(typeName));
return Actor.Props.Create(() => new QualifiedCounter(typeName, id));
}

public readonly string TypeName;

public override string PersistenceId { get { return TypeName + "-" + Self.Path.Name; } }

public QualifiedCounter(string typeName)
public QualifiedCounter(string typeName, string id)
: base(id)
{
TypeName = typeName;
}
}

internal class AnotherCounter : QualifiedCounter
{
public AnotherCounter()
: base("AnotherCounter")
public static new Props Props(string id)
{
return Actor.Props.Create(() => new AnotherCounter(id));
}
public static new string ShardingTypeName => nameof(AnotherCounter);

public AnotherCounter(string id)
: base(AnotherCounter.ShardingTypeName, id)
{
}
}

internal class CounterSupervisor : ActorBase
{
public readonly IActorRef Counter;
public static string ShardingTypeName => nameof(CounterSupervisor);

public CounterSupervisor()
public static Props Props(string id)
{
Counter = Context.ActorOf(Props.Create<Counter>(), "theCounter");
return Actor.Props.Create(() => new CounterSupervisor(id));
}

public readonly string entityId;
public readonly IActorRef counter;

public CounterSupervisor(string entityId)
{
this.entityId = entityId;
counter = Context.ActorOf(Counter.Props(entityId), "theCounter");
}

protected override SupervisorStrategy SupervisorStrategy()
Expand All @@ -328,7 +350,7 @@ protected override SupervisorStrategy SupervisorStrategy()

protected override bool Receive(object message)
{
Counter.Forward(message);
counter.Forward(message);
return true;
}
}
Expand All @@ -355,6 +377,9 @@ protected DDataClusterShardingWithEntityRecoverySpec(DDataClusterShardingWithEnt
}
public abstract class ClusterShardingSpec : MultiNodeClusterSpec
{
// must use different unique name for some tests than the one used in API tests
public static string TestCounterShardingTypeName => $"Test{Counter.ShardingTypeName}";

#region Setup

private readonly Lazy<IActorRef> _region;
Expand All @@ -373,7 +398,7 @@ protected ClusterShardingSpec(ClusterShardingSpecConfig config, Type type)
{
_config = config;

_region = new Lazy<IActorRef>(() => CreateRegion("counter", false));
_region = new Lazy<IActorRef>(() => CreateRegion(TestCounterShardingTypeName, false));
_rebalancingRegion = new Lazy<IActorRef>(() => CreateRegion("rebalancingCounter", false));

_persistentEntitiesRegion = new Lazy<IActorRef>(() => CreateRegion("RememberCounterEntities", true));
Expand All @@ -397,7 +422,7 @@ protected ClusterShardingSpec(ClusterShardingSpecConfig config, Type type)
EnterBarrier("startup");
}
protected bool IsDDataMode { get; }

protected override void AfterTermination()
{
base.AfterTermination();
Expand Down Expand Up @@ -430,7 +455,7 @@ private void CreateCoordinator()
{
var typeNames = new[]
{
"counter", "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter",
TestCounterShardingTypeName, "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter",
"RememberCounter", "RebalancingRememberCounter", "AutoMigrateRememberRegionTest"
};

Expand Down Expand Up @@ -479,7 +504,7 @@ private IActorRef CreateRegion(string typeName, bool rememberEntities)

return Sys.ActorOf(Props.Create(() => new ShardRegion(
typeName,
QualifiedCounter.Props(typeName),
entityId => QualifiedCounter.Props(typeName, entityId),
settings,
"/user/" + typeName + "Coordinator/singleton/coordinator",
Counter.ExtractEntityId,
Expand Down Expand Up @@ -609,7 +634,7 @@ public void ClusterSharding_should_use_second_node()
r.Tell(new Counter.EntityEnvelope(2, Counter.Increment.Instance));
r.Tell(new Counter.Get(2));
ExpectMsg(3);
LastSender.Path.Should().Be(Node(_config.Second) / "user" / "counterRegion" / "2" / "2");
LastSender.Path.Should().Be(Node(_config.Second) / "user" / $"{TestCounterShardingTypeName}Region" / "2" / "2");

r.Tell(new Counter.Get(11));
ExpectMsg(1);
Expand Down Expand Up @@ -669,9 +694,9 @@ public void ClusterSharding_should_support_proxy_only_mode()

var settings = ClusterShardingSettings.Create(cfg, Sys.Settings.Config.GetConfig("akka.cluster.singleton"));
var proxy = Sys.ActorOf(ShardRegion.ProxyProps(
typeName: "counter",
typeName: TestCounterShardingTypeName,
settings: settings,
coordinatorPath: "/user/counterCoordinator/singleton/coordinator",
coordinatorPath: $"/user/{TestCounterShardingTypeName}Coordinator/singleton/coordinator",
extractEntityId: Counter.ExtractEntityId,
extractShardId: Counter.ExtractShardId,
replicator: Sys.DeadLetters,
Expand Down Expand Up @@ -770,12 +795,12 @@ public void ClusterSharding_should_use_third_and_fourth_node()
r.Tell(new Counter.EntityEnvelope(3, Counter.Increment.Instance));
r.Tell(new Counter.Get(3));
ExpectMsg(11);
LastSender.Path.Should().Be(Node(_config.Third) / "user" / "counterRegion" / "3" / "3");
LastSender.Path.Should().Be(Node(_config.Third) / "user" / $"{TestCounterShardingTypeName}Region" / "3" / "3");

r.Tell(new Counter.EntityEnvelope(4, Counter.Increment.Instance));
r.Tell(new Counter.Get(4));
ExpectMsg(21);
LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / "counterRegion" / "4" / "4");
LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / $"{TestCounterShardingTypeName}Region" / "4" / "4");
}, _config.First);
EnterBarrier("first-update");

Expand Down Expand Up @@ -818,7 +843,7 @@ public void ClusterSharding_should_recover_coordinator_state_after_coordinator_c
{
_region.Value.Tell(new Counter.Get(3), probe3.Ref);
probe3.ExpectMsg(11);
probe3.LastSender.Path.Should().Be(Node(_config.Third) / "user" / "counterRegion" / "3" / "3");
probe3.LastSender.Path.Should().Be(Node(_config.Third) / "user" / $"{TestCounterShardingTypeName}Region" / "3" / "3");
});
});

Expand All @@ -829,7 +854,7 @@ public void ClusterSharding_should_recover_coordinator_state_after_coordinator_c
{
_region.Value.Tell(new Counter.Get(4), probe4.Ref);
probe4.ExpectMsg(21);
probe4.LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / "counterRegion" / "4" / "4");
probe4.LastSender.Path.Should().Be(Node(_config.Fourth) / "user" / $"{TestCounterShardingTypeName}Region" / "4" / "4");
});
});
}, _config.Fifth);
Expand Down Expand Up @@ -888,24 +913,24 @@ public void ClusterSharding_should_be_easy_to_use_with_extensions()
{
//#counter-start
ClusterSharding.Get(Sys).Start(
typeName: "Counter",
entityProps: Props.Create<Counter>(),
typeName: Counter.ShardingTypeName,
entityPropsFactory: entityId => Counter.Props(entityId),
settings: ClusterShardingSettings.Create(Sys),
extractEntityId: Counter.ExtractEntityId,
extractShardId: Counter.ExtractShardId);

//#counter-start
ClusterSharding.Get(Sys).Start(
typeName: "AnotherCounter",
entityProps: Props.Create<AnotherCounter>(),
typeName: AnotherCounter.ShardingTypeName,
entityPropsFactory: entityId => AnotherCounter.Props(entityId),
settings: ClusterShardingSettings.Create(Sys),
extractEntityId: Counter.ExtractEntityId,
extractShardId: Counter.ExtractShardId);

//#counter-supervisor-start
ClusterSharding.Get(Sys).Start(
typeName: "SupervisedCounter",
entityProps: Props.Create<CounterSupervisor>(),
typeName: CounterSupervisor.ShardingTypeName,
entityPropsFactory: entityId => CounterSupervisor.Props(entityId),
settings: ClusterShardingSettings.Create(Sys),
extractEntityId: Counter.ExtractEntityId,
extractShardId: Counter.ExtractShardId);
Expand All @@ -915,18 +940,19 @@ public void ClusterSharding_should_be_easy_to_use_with_extensions()
RunOn(() =>
{
//#counter-usage
var counterRegion = ClusterSharding.Get(Sys).ShardRegion("Counter");
counterRegion.Tell(new Counter.Get(123));
var counterRegion = ClusterSharding.Get(Sys).ShardRegion(Counter.ShardingTypeName);
var entityId = 999;
counterRegion.Tell(new Counter.Get(entityId));
ExpectMsg(0);

counterRegion.Tell(new Counter.EntityEnvelope(123, Counter.Increment.Instance));
counterRegion.Tell(new Counter.Get(123));
counterRegion.Tell(new Counter.EntityEnvelope(entityId, Counter.Increment.Instance));
counterRegion.Tell(new Counter.Get(entityId));
ExpectMsg(1);
//#counter-usage

var anotherCounterRegion = ClusterSharding.Get(Sys).ShardRegion("AnotherCounter");
anotherCounterRegion.Tell(new Counter.EntityEnvelope(123, Counter.Decrement.Instance));
anotherCounterRegion.Tell(new Counter.Get(123));
var anotherCounterRegion = ClusterSharding.Get(Sys).ShardRegion(AnotherCounter.ShardingTypeName);
anotherCounterRegion.Tell(new Counter.EntityEnvelope(entityId, Counter.Decrement.Instance));
anotherCounterRegion.Tell(new Counter.Get(entityId));
ExpectMsg(-1);
}, _config.Fifth);
EnterBarrier("extension-used");
Expand All @@ -936,8 +962,8 @@ public void ClusterSharding_should_be_easy_to_use_with_extensions()
{
for (int i = 1000; i <= 1010; i++)
{
ClusterSharding.Get(Sys).ShardRegion("Counter").Tell(new Counter.EntityEnvelope(i, Counter.Increment.Instance));
ClusterSharding.Get(Sys).ShardRegion("Counter").Tell(new Counter.Get(i));
ClusterSharding.Get(Sys).ShardRegion(Counter.ShardingTypeName).Tell(new Counter.EntityEnvelope(i, Counter.Increment.Instance));
ClusterSharding.Get(Sys).ShardRegion(Counter.ShardingTypeName).Tell(new Counter.Get(i));
ExpectMsg(1);
LastSender.Path.Address.Should().NotBe(Cluster.SelfAddress);
}
Expand All @@ -954,7 +980,7 @@ public void ClusterSharding_should_be_easy_API_for_starting()
{
var counterRegionViaStart = ClusterSharding.Get(Sys).Start(
typeName: "ApiTest",
entityProps: Props.Create<Counter>(),
entityPropsFactory: Counter.Props,
settings: ClusterShardingSettings.Create(Sys),
extractEntityId: Counter.ExtractEntityId,
extractShardId: Counter.ExtractShardId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterShardingConfigSpec.cs" company="Akka.NET Project">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright headers have the wrong file names

// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit.TestActors;
using FluentAssertions;
using Xunit;

namespace Akka.Cluster.Sharding.Tests
{
public class GetShardTypeNamesSpec : Akka.TestKit.Xunit2.TestKit
{
public GetShardTypeNamesSpec() : base(GetConfig())
{
}

public static Config GetConfig()
{
return ConfigurationFactory.ParseString("akka.actor.provider = cluster")

.WithFallback(Sharding.ClusterSharding.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

[Fact]
public void GetShardTypeNames_must_contain_empty_when_join_cluster_without_shards()
{
ClusterSharding.Get(Sys).ShardTypeNames.Should().BeEmpty();
}

[Fact]
public void GetShardTypeNames_must_contain_started_shards_when_started_2_shards()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);
var settings = ClusterShardingSettings.Create(Sys);
ClusterSharding.Get(Sys).Start("type1", EchoActor.Props(this), settings, ExtractEntityId, ExtractShardId);
ClusterSharding.Get(Sys).Start("type2", EchoActor.Props(this), settings, ExtractEntityId, ExtractShardId);

ClusterSharding.Get(Sys).ShardTypeNames.ShouldBeEquivalentTo(new string[] { "type1", "type2" });
}

private Tuple<string, object> ExtractEntityId(object message)
{
switch (message)
{
case int i:
return new Tuple<string, object>(i.ToString(), message);
}
throw new NotSupportedException();
}

private string ExtractShardId(object message)
{
switch (message)
{
case int i:
return (i % 10).ToString();
}
throw new NotSupportedException();
}
}
}
Loading