Skip to content

Commit

Permalink
Don't passivate idle for remembering entities (akkadotnet#3833)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored and Aaronontheweb committed Jul 21, 2019
1 parent 9d7e3bc commit 68ff6fb
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 57 deletions.
2 changes: 1 addition & 1 deletion docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ To reduce memory consumption, you may decide to stop entities after some period

### Automatic Passivation

The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. By default automatic passivation is disabled.
The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.PassivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. It is always disabled if @ref:[Remembering Entities](#remembering-entities) is enabled.

## Remembering entities

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;

namespace Akka.Cluster.Sharding.Tests
{
public class InactiveEntityPassivationSpec : AkkaSpec
public abstract class AbstractInactiveEntityPassivationSpec : AkkaSpec
{
#region Protocol

Expand Down Expand Up @@ -48,7 +47,6 @@ protected override void OnReceive(object message)
switch (message)
{
case Passivate _:
Probe.Tell($"{_id} passivating");
Context.Stop(Self);
break;
default:
Expand All @@ -69,51 +67,69 @@ public GotIt(string id, object msg, long when)
Msg = msg;
When = when;
}

public override int GetHashCode()
{
return Id.GetHashCode();
}

public override bool Equals(object obj)
{
if (obj is GotIt other)
return Id == other.Id;
return false;
}
}
}

#endregion

protected ClusterShardingSettings settings;
protected readonly TimeSpan smallTolerance = TimeSpan.FromMilliseconds(300);

private readonly ExtractEntityId _extractEntityId = message =>
message is int msg ? Tuple.Create(msg.ToString(), message) : null;

private readonly ExtractShardId _extractShard = message =>
message is int msg ? (msg % 10).ToString(CultureInfo.InvariantCulture) : null;

public InactiveEntityPassivationSpec()
: base(GetConfig())
{ }
public AbstractInactiveEntityPassivationSpec(Config config)
: base(config.WithFallback(GetConfig()))
{
}

public static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.actor.provider = cluster
akka.cluster.sharding.passivate-idle-entity-after = 3s")
akka.cluster.sharding.passivate-idle-entity-after = 3s
akka.persistence.journal.plugin = ""akka.persistence.journal.inmem""
akka.remote.dot-netty.tcp.port = 0")
.WithFallback(ClusterSharding.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

[Fact]
public void Passivation_of_inactive_entities_must_passivate_entities_when_they_have_not_seen_messages_for_the_configured_duration()
protected IActorRef Start(TestProbe probe)
{
// Single node cluster
settings = ClusterShardingSettings.Create(Sys);
// single node cluster
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = new TestProbe(Sys, new XunitAssertions());
var settings = ClusterShardingSettings.Create(Sys);
var region = ClusterSharding.Get(Sys).Start(
return ClusterSharding.Get(Sys).Start(
"myType",
Entity.Props(probe.Ref),
settings,
_extractEntityId,
_extractShard,
new LeastShardAllocationStrategy(10, 3),
ClusterSharding.Get(Sys).DefaultShardAllocationStrategy(settings),
Passivate.Instance);

}

protected TimeSpan TimeUntilPassivate(IActorRef region, TestProbe probe)
{
region.Tell(1);
region.Tell(2);

var responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
Expand All @@ -129,21 +145,53 @@ public void Passivation_of_inactive_entities_must_passivate_entities_when_they_h
region.Tell(2);
probe.ExpectMsg<Entity.GotIt>().Id.ShouldBe("2");

// Make sure "1" hasn't seen a message in 3 seconds and passivates
var timeSinceOneSawAMessage = DateTime.Now.Ticks - timeOneSawMessage;
probe.ExpectNoMsg(TimeSpan.FromSeconds(3) - TimeSpan.FromTicks(timeSinceOneSawAMessage));
probe.ExpectMsg("1 passivating");
return settings.PassivateIdleEntityAfter - TimeSpan.FromTicks(timeSinceOneSawAMessage) + smallTolerance;
}
}

public class InactiveEntityPassivationSpec : AbstractInactiveEntityPassivationSpec
{
public InactiveEntityPassivationSpec()
: base(ConfigurationFactory.ParseString(@"akka.cluster.sharding.passivate-idle-entity-after = 3s"))
{
}

[Fact]
public void Passivation_of_inactive_entities_must_passivate_entities_when_they_have_not_seen_messages_for_the_configured_duration()
{
var probe = CreateTestProbe();
var region = Start(probe);

// But it can be re-activated just fine
// make sure "1" hasn't seen a message in 3 seconds and passivates
probe.ExpectNoMsg(TimeUntilPassivate(region, probe));

// but it can be re activated
region.Tell(1);
region.Tell(2);

responses = new[]
var responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");
}
}

public class DisabledInactiveEntityPassivationSpec : AbstractInactiveEntityPassivationSpec
{
public DisabledInactiveEntityPassivationSpec()
: base(ConfigurationFactory.ParseString(@"akka.cluster.sharding.passivate-idle-entity-after = off"))
{
}

[Fact]
public void Passivation_of_inactive_entities_must_not_passivate_when_passivation_is_disabled()
{
var probe = CreateTestProbe();
var region = Start(probe);
probe.ExpectNoMsg(TimeUntilPassivate(region, probe));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public sealed class ClusterShardingSettings : INoSerializationVerificationNeeded
/// Passivate entities that have not received any message in this interval.
/// Note that only messages sent through sharding are counted, so direct messages
/// to the <see cref="IActorRef"/> of the actor or messages that it sends to itself are not counted as activity.
/// Use 0 to disable automatic passivation.
/// Use 0 to disable automatic passivation. It is always disabled if `RememberEntities` is enabled.
/// </summary>
public readonly TimeSpan PassivateIdleEntityAfter;

Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public DDataShard(
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self)
: null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public PersistentShard(
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self)
: null;
}
Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public Shard(
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, PassivateIdleTick.Instance, Self)
: null;

Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ protected object RegistrationMessage
protected override void PreStart()
{
Cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent));
if (Settings.PassivateIdleEntityAfter > TimeSpan.Zero)
if (Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities)
{
Log.Info($"Idle entities will be passivated after [{Settings.PassivateIdleEntityAfter}]");
}
Expand Down
70 changes: 40 additions & 30 deletions src/contrib/cluster/Akka.Cluster.Sharding/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ akka.cluster.sharding {

# Set this to a time duration to have sharding passivate entities when they have not
# gotten any message in this long time. Set to 'off' to disable.
passivate-idle-entity-after = off
# It is always disabled if `remember-entities` is enabled.
passivate-idle-entity-after = 120s

# If the coordinator can't store state changes it will be stopped
# and started again after this duration, with an exponential back-off
Expand All @@ -40,6 +41,8 @@ akka.cluster.sharding {
buffer-size = 100000

# Timeout of the shard rebalancing process.
# Additionally, if an entity doesn't handle the stopMessage
# after (handoff-timeout - 5.seconds).max(1.second) it will be stopped forcefully
handoff-timeout = 60s

# Time given to a region to acknowledge it's hosting a shard.
Expand Down Expand Up @@ -91,7 +94,14 @@ akka.cluster.sharding {
least-shard-allocation-strategy {
# Threshold of how large the difference between most and least number of
# allocated shards must be to begin the rebalancing.
rebalance-threshold = 10
# The difference between number of shards in the region with most shards and
# the region with least shards must be greater than (>) the `rebalanceThreshold`
# for the rebalance to occur.
# 1 gives the best distribution and therefore typically the best choice.
# Increasing the threshold can result in quicker rebalance but has the
# drawback of increased difference between number of shards (and therefore load)
# on different nodes before rebalance will occur.
rebalance-threshold = 1

# The number of ongoing rebalancing processes is limited to this number.
max-simultaneous-rebalance = 3
Expand Down Expand Up @@ -128,8 +138,8 @@ akka.cluster.sharding {
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
use-dispatcher = ""
# Settings for the Distributed Data replicator.

# Settings for the Distributed Data replicator.
# Same layout as akka.cluster.distributed-data.
# The "role" of the distributed-data configuration is not used. The distributed-data
# role will be the same as "akka.cluster.sharding.role".
Expand All @@ -140,12 +150,12 @@ akka.cluster.sharding {
# minCap parameter to MajorityWrite and MajorityRead consistency level.
majority-min-cap = 5
#durable.keys = ["shard-*"]

# When using many entities with "remember entities" the Gossip message
# can become to large if including to many in same message. Limit to
# the same number as the number of ORSet per shard.
max-delta-elements = 5

# Actor name of the Replicator actor, /system/ddataReplicator
name = ddataReplicator

Expand All @@ -158,7 +168,7 @@ akka.cluster.sharding {

# How often the subscribers will be notified of changes, if any
notify-subscribers-interval = 500 ms

# The id of the dispatcher to use for Replicator actors. If not specified
# default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
Expand All @@ -167,53 +177,53 @@ akka.cluster.sharding {
# How often the Replicator checks for pruning of data associated with
# removed cluster nodes.
pruning-interval = 30 s

# How long time it takes (worst case) to spread the data to all other replica nodes.
# This is used when initiating and completing the pruning process of data associated
# with removed cluster nodes. The time measurement is stopped when any replica is
# with removed cluster nodes. The time measurement is stopped when any replica is
# unreachable, so it should be configured to worst case in a healthy cluster.
max-pruning-dissemination = 60 s
# Serialized Write and Read messages are cached when they are sent to

# Serialized Write and Read messages are cached when they are sent to
# several nodes. If no further activity they are removed from the cache
# after this duration.
serializer-cache-time-to-live = 10s

delta-crdt {

# Some complex deltas grow in size for each update and above this
# threshold such deltas are discarded and sent as full state instead.
max-delta-size = 200
max-delta-size = 200
}

durable {
# List of keys that are durable. Prefix matching is supported by using * at the
# end of a key.
# end of a key.
keys = []

# The markers of that pruning has been performed for a removed node are kept for this
# time and thereafter removed. If and old data entry that was never pruned is
# injected and merged with existing data after this time the value will not be correct.
# This would be possible if replica with durable data didn't participate in the pruning
# (e.g. it was shutdown) and later started after this time. A durable replica should not
# (e.g. it was shutdown) and later started after this time. A durable replica should not
# be stopped for longer time than this duration and if it is joining again after this
# duration its data should first be manually removed (from the lmdb directory).
# It should be in the magnitude of days. Note that there is a corresponding setting
# for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
pruning-marker-time-to-live = 10 d

# Fully qualified class name of the durable store actor. It must be a subclass
# of akka.actor.Actor and handle the protocol defined in
# akka.cluster.ddata.DurableStore. The class must have a constructor with
# of akka.actor.Actor and handle the protocol defined in
# akka.cluster.ddata.DurableStore. The class must have a constructor with
# com.typesafe.config.Config parameter.
store-actor-class = ""

use-dispatcher = akka.cluster.distributed-data.durable.pinned-store

pinned-store {
executor = thread-pool-executor
type = PinnedDispatcher
}
}
}

lmdb {
Expand All @@ -226,20 +236,20 @@ akka.cluster.sharding {
#
# When running in production you may want to configure this to a specific
# path (alt 2), since the default directory contains the remote port of the
# actor system to make the name unique. If using a dynamically assigned
# port (0) it will be different each time and the previously stored data
# actor system to make the name unique. If using a dynamically assigned
# port (0) it will be different each time and the previously stored data
# will not be loaded.
dir = "ddata"

# Size in bytes of the memory mapped file.
map-size = 104857600 # 100MiB

# Accumulate changes before storing improves performance with the
# risk of losing the last writes if the JVM crashes.
# The interval is by default set to 'off' to write each update immediately.
# Enabling write behind by specifying a duration, e.g. 200ms, is especially
# efficient when performing many writes to the same key, because it is only
# the last value for each key that will be serialized and stored.
# Enabling write behind by specifying a duration, e.g. 200ms, is especially
# efficient when performing many writes to the same key, because it is only
# the last value for each key that will be serialized and stored.
# write-behind-interval = 200 ms
write-behind-interval = off
}
Expand Down

0 comments on commit 68ff6fb

Please sign in to comment.