From 68ff6fbe38bba9d24d71206efc9c9cec9d43f275 Mon Sep 17 00:00:00 2001 From: Ismael Hamed Date: Mon, 1 Jul 2019 08:35:14 +0200 Subject: [PATCH] Don't passivate idle for remembering entities (#3833) --- docs/articles/clustering/cluster-sharding.md | 2 +- .../InactiveEntityPassivationSpec.cs | 90 ++++++++++++++----- .../ClusterShardingSettings.cs | 2 +- .../Akka.Cluster.Sharding/DDataShard.cs | 2 +- .../Akka.Cluster.Sharding/PersistentShard.cs | 2 +- .../cluster/Akka.Cluster.Sharding/Shard.cs | 2 +- .../Akka.Cluster.Sharding/ShardRegion.cs | 2 +- .../Akka.Cluster.Sharding/reference.conf | 70 ++++++++------- 8 files changed, 115 insertions(+), 57 deletions(-) diff --git a/docs/articles/clustering/cluster-sharding.md b/docs/articles/clustering/cluster-sharding.md index e4f91f964fb..5cf23f67925 100644 --- a/docs/articles/clustering/cluster-sharding.md +++ b/docs/articles/clustering/cluster-sharding.md @@ -82,7 +82,7 @@ To reduce memory consumption, you may decide to stop entities after some period ### Automatic Passivation -The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. By default automatic passivation is disabled. +The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.PassivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. It is always disabled if @ref:[Remembering Entities](#remembering-entities) is enabled. ## Remembering entities diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/InactiveEntityPassivationSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/InactiveEntityPassivationSpec.cs index a365de35051..12e09de0534 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/InactiveEntityPassivationSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/InactiveEntityPassivationSpec.cs @@ -13,13 +13,12 @@ using Akka.Cluster.Tools.Singleton; using Akka.Configuration; using Akka.TestKit; -using Akka.TestKit.Xunit2; using FluentAssertions; using Xunit; namespace Akka.Cluster.Sharding.Tests { - public class InactiveEntityPassivationSpec : AkkaSpec + public abstract class AbstractInactiveEntityPassivationSpec : AkkaSpec { #region Protocol @@ -48,7 +47,6 @@ protected override void OnReceive(object message) switch (message) { case Passivate _: - Probe.Tell($"{_id} passivating"); Context.Stop(Self); break; default: @@ -69,51 +67,69 @@ public GotIt(string id, object msg, long when) Msg = msg; When = when; } + + public override int GetHashCode() + { + return Id.GetHashCode(); + } + + public override bool Equals(object obj) + { + if (obj is GotIt other) + return Id == other.Id; + return false; + } } } #endregion + protected ClusterShardingSettings settings; + protected readonly TimeSpan smallTolerance = TimeSpan.FromMilliseconds(300); + private readonly ExtractEntityId _extractEntityId = message => message is int msg ? Tuple.Create(msg.ToString(), message) : null; private readonly ExtractShardId _extractShard = message => message is int msg ? (msg % 10).ToString(CultureInfo.InvariantCulture) : null; - public InactiveEntityPassivationSpec() - : base(GetConfig()) - { } + public AbstractInactiveEntityPassivationSpec(Config config) + : base(config.WithFallback(GetConfig())) + { + } public static Config GetConfig() { return ConfigurationFactory.ParseString(@" akka.loglevel = INFO akka.actor.provider = cluster - akka.cluster.sharding.passivate-idle-entity-after = 3s") + akka.cluster.sharding.passivate-idle-entity-after = 3s + akka.persistence.journal.plugin = ""akka.persistence.journal.inmem"" + akka.remote.dot-netty.tcp.port = 0") .WithFallback(ClusterSharding.DefaultConfig()) .WithFallback(ClusterSingletonManager.DefaultConfig()); } - [Fact] - public void Passivation_of_inactive_entities_must_passivate_entities_when_they_have_not_seen_messages_for_the_configured_duration() + protected IActorRef Start(TestProbe probe) { - // Single node cluster + settings = ClusterShardingSettings.Create(Sys); + // single node cluster Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); - var probe = new TestProbe(Sys, new XunitAssertions()); - var settings = ClusterShardingSettings.Create(Sys); - var region = ClusterSharding.Get(Sys).Start( + return ClusterSharding.Get(Sys).Start( "myType", Entity.Props(probe.Ref), settings, _extractEntityId, _extractShard, - new LeastShardAllocationStrategy(10, 3), + ClusterSharding.Get(Sys).DefaultShardAllocationStrategy(settings), Passivate.Instance); - + } + + protected TimeSpan TimeUntilPassivate(IActorRef region, TestProbe probe) + { region.Tell(1); region.Tell(2); - var responses = new[] { probe.ExpectMsg(), @@ -129,16 +145,32 @@ public void Passivation_of_inactive_entities_must_passivate_entities_when_they_h region.Tell(2); probe.ExpectMsg().Id.ShouldBe("2"); - // Make sure "1" hasn't seen a message in 3 seconds and passivates var timeSinceOneSawAMessage = DateTime.Now.Ticks - timeOneSawMessage; - probe.ExpectNoMsg(TimeSpan.FromSeconds(3) - TimeSpan.FromTicks(timeSinceOneSawAMessage)); - probe.ExpectMsg("1 passivating"); + return settings.PassivateIdleEntityAfter - TimeSpan.FromTicks(timeSinceOneSawAMessage) + smallTolerance; + } + } + + public class InactiveEntityPassivationSpec : AbstractInactiveEntityPassivationSpec + { + public InactiveEntityPassivationSpec() + : base(ConfigurationFactory.ParseString(@"akka.cluster.sharding.passivate-idle-entity-after = 3s")) + { + } + + [Fact] + public void Passivation_of_inactive_entities_must_passivate_entities_when_they_have_not_seen_messages_for_the_configured_duration() + { + var probe = CreateTestProbe(); + var region = Start(probe); - // But it can be re-activated just fine + // make sure "1" hasn't seen a message in 3 seconds and passivates + probe.ExpectNoMsg(TimeUntilPassivate(region, probe)); + + // but it can be re activated region.Tell(1); region.Tell(2); - responses = new[] + var responses = new[] { probe.ExpectMsg(), probe.ExpectMsg() @@ -146,4 +178,20 @@ public void Passivation_of_inactive_entities_must_passivate_entities_when_they_h responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2"); } } + + public class DisabledInactiveEntityPassivationSpec : AbstractInactiveEntityPassivationSpec + { + public DisabledInactiveEntityPassivationSpec() + : base(ConfigurationFactory.ParseString(@"akka.cluster.sharding.passivate-idle-entity-after = off")) + { + } + + [Fact] + public void Passivation_of_inactive_entities_must_not_passivate_when_passivation_is_disabled() + { + var probe = CreateTestProbe(); + var region = Start(probe); + probe.ExpectNoMsg(TimeUntilPassivate(region, probe)); + } + } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs index c79776f4313..71c39dc14ce 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingSettings.cs @@ -187,7 +187,7 @@ public sealed class ClusterShardingSettings : INoSerializationVerificationNeeded /// Passivate entities that have not received any message in this interval. /// Note that only messages sent through sharding are counted, so direct messages /// to the of the actor or messages that it sends to itself are not counted as activity. - /// Use 0 to disable automatic passivation. + /// Use 0 to disable automatic passivation. It is always disabled if `RememberEntities` is enabled. /// public readonly TimeSpan PassivateIdleEntityAfter; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs index 8cadfd69ccb..12a6bd49272 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs @@ -100,7 +100,7 @@ public DDataShard( : EntityRecoveryStrategy.AllStrategy; var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2); - PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero + PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities ? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self) : null; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs index 771b8f01211..2e3da84febe 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs @@ -77,7 +77,7 @@ public PersistentShard( : EntityRecoveryStrategy.AllStrategy; var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2); - PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero + PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities ? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self) : null; } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index b201011582d..a6ebbb4d250 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -407,7 +407,7 @@ public Shard( : EntityRecoveryStrategy.AllStrategy; var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2); - PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero + PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities ? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, PassivateIdleTick.Instance, Self) : null; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index bf3aabde9c1..b78dfe12471 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -438,7 +438,7 @@ protected object RegistrationMessage protected override void PreStart() { Cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent)); - if (Settings.PassivateIdleEntityAfter > TimeSpan.Zero) + if (Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities) { Log.Info($"Idle entities will be passivated after [{Settings.PassivateIdleEntityAfter}]"); } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index 1b34c9580a1..4ed85ae8ce5 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -25,7 +25,8 @@ akka.cluster.sharding { # Set this to a time duration to have sharding passivate entities when they have not # gotten any message in this long time. Set to 'off' to disable. - passivate-idle-entity-after = off + # It is always disabled if `remember-entities` is enabled. + passivate-idle-entity-after = 120s # If the coordinator can't store state changes it will be stopped # and started again after this duration, with an exponential back-off @@ -40,6 +41,8 @@ akka.cluster.sharding { buffer-size = 100000 # Timeout of the shard rebalancing process. + # Additionally, if an entity doesn't handle the stopMessage + # after (handoff-timeout - 5.seconds).max(1.second) it will be stopped forcefully handoff-timeout = 60s # Time given to a region to acknowledge it's hosting a shard. @@ -91,7 +94,14 @@ akka.cluster.sharding { least-shard-allocation-strategy { # Threshold of how large the difference between most and least number of # allocated shards must be to begin the rebalancing. - rebalance-threshold = 10 + # The difference between number of shards in the region with most shards and + # the region with least shards must be greater than (>) the `rebalanceThreshold` + # for the rebalance to occur. + # 1 gives the best distribution and therefore typically the best choice. + # Increasing the threshold can result in quicker rebalance but has the + # drawback of increased difference between number of shards (and therefore load) + # on different nodes before rebalance will occur. + rebalance-threshold = 1 # The number of ongoing rebalancing processes is limited to this number. max-simultaneous-rebalance = 3 @@ -128,8 +138,8 @@ akka.cluster.sharding { # This dispatcher for the entity actors is defined by the user provided # Props, i.e. this dispatcher is not used for the entity actors. use-dispatcher = "" - - # Settings for the Distributed Data replicator. + + # Settings for the Distributed Data replicator. # Same layout as akka.cluster.distributed-data. # The "role" of the distributed-data configuration is not used. The distributed-data # role will be the same as "akka.cluster.sharding.role". @@ -140,12 +150,12 @@ akka.cluster.sharding { # minCap parameter to MajorityWrite and MajorityRead consistency level. majority-min-cap = 5 #durable.keys = ["shard-*"] - + # When using many entities with "remember entities" the Gossip message # can become to large if including to many in same message. Limit to # the same number as the number of ORSet per shard. max-delta-elements = 5 - + # Actor name of the Replicator actor, /system/ddataReplicator name = ddataReplicator @@ -158,7 +168,7 @@ akka.cluster.sharding { # How often the subscribers will be notified of changes, if any notify-subscribers-interval = 500 ms - + # The id of the dispatcher to use for Replicator actors. If not specified # default dispatcher is used. # If specified you need to define the settings of the actual dispatcher. @@ -167,53 +177,53 @@ akka.cluster.sharding { # How often the Replicator checks for pruning of data associated with # removed cluster nodes. pruning-interval = 30 s - + # How long time it takes (worst case) to spread the data to all other replica nodes. # This is used when initiating and completing the pruning process of data associated - # with removed cluster nodes. The time measurement is stopped when any replica is + # with removed cluster nodes. The time measurement is stopped when any replica is # unreachable, so it should be configured to worst case in a healthy cluster. max-pruning-dissemination = 60 s - - # Serialized Write and Read messages are cached when they are sent to + + # Serialized Write and Read messages are cached when they are sent to # several nodes. If no further activity they are removed from the cache # after this duration. serializer-cache-time-to-live = 10s - + delta-crdt { # Some complex deltas grow in size for each update and above this # threshold such deltas are discarded and sent as full state instead. - max-delta-size = 200 + max-delta-size = 200 } - + durable { # List of keys that are durable. Prefix matching is supported by using * at the - # end of a key. + # end of a key. keys = [] - + # The markers of that pruning has been performed for a removed node are kept for this # time and thereafter removed. If and old data entry that was never pruned is # injected and merged with existing data after this time the value will not be correct. # This would be possible if replica with durable data didn't participate in the pruning - # (e.g. it was shutdown) and later started after this time. A durable replica should not + # (e.g. it was shutdown) and later started after this time. A durable replica should not # be stopped for longer time than this duration and if it is joining again after this # duration its data should first be manually removed (from the lmdb directory). # It should be in the magnitude of days. Note that there is a corresponding setting # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'. pruning-marker-time-to-live = 10 d - + # Fully qualified class name of the durable store actor. It must be a subclass - # of akka.actor.Actor and handle the protocol defined in - # akka.cluster.ddata.DurableStore. The class must have a constructor with + # of akka.actor.Actor and handle the protocol defined in + # akka.cluster.ddata.DurableStore. The class must have a constructor with # com.typesafe.config.Config parameter. store-actor-class = "" - + use-dispatcher = akka.cluster.distributed-data.durable.pinned-store - + pinned-store { executor = thread-pool-executor type = PinnedDispatcher - } + } } lmdb { @@ -226,20 +236,20 @@ akka.cluster.sharding { # # When running in production you may want to configure this to a specific # path (alt 2), since the default directory contains the remote port of the - # actor system to make the name unique. If using a dynamically assigned - # port (0) it will be different each time and the previously stored data + # actor system to make the name unique. If using a dynamically assigned + # port (0) it will be different each time and the previously stored data # will not be loaded. dir = "ddata" - + # Size in bytes of the memory mapped file. map-size = 104857600 # 100MiB - + # Accumulate changes before storing improves performance with the # risk of losing the last writes if the JVM crashes. # The interval is by default set to 'off' to write each update immediately. - # Enabling write behind by specifying a duration, e.g. 200ms, is especially - # efficient when performing many writes to the same key, because it is only - # the last value for each key that will be serialized and stored. + # Enabling write behind by specifying a duration, e.g. 200ms, is especially + # efficient when performing many writes to the same key, because it is only + # the last value for each key that will be serialized and stored. # write-behind-interval = 200 ms write-behind-interval = off }