Skip to content

Commit

Permalink
Don't passivate idle for remembering entities (akkadotnet#3833)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored and Aaronontheweb committed Jul 26, 2019
1 parent fecc2a3 commit 754e6b8
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 92 deletions.
2 changes: 1 addition & 1 deletion docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ To reduce memory consumption, you may decide to stop entities after some period

### Automatic Passivation

The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. By default automatic passivation is disabled.
The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.PassivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. It is always disabled if @ref:[Remembering Entities](#remembering-entities) is enabled.

## Remembering entities

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;

Expand Down Expand Up @@ -48,7 +47,6 @@ protected override void OnReceive(object message)
switch (message)
{
case Passivate _:
Probe.Tell($"{_id} passivating");
Context.Stop(Self);
break;
default:
Expand Down Expand Up @@ -111,9 +109,9 @@ public static Config GetConfig()
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

protected IActorRef Start(TestProbe probe, bool rememberEntities)
protected IActorRef Start(TestProbe probe)
{
settings = ClusterShardingSettings.Create(Sys).WithRememberEntities(rememberEntities);
settings = ClusterShardingSettings.Create(Sys);
// single node cluster
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

Expand All @@ -133,9 +131,9 @@ protected TimeSpan TimeUntilPassivate(IActorRef region, TestProbe probe)
region.Tell(2);
var responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");
var timeOneSawMessage = responses.Single(r => r.Id == "1").When;

Expand All @@ -162,97 +160,21 @@ public InactiveEntityPassivationSpec()
public void Passivation_of_inactive_entities_must_passivate_entities_when_they_have_not_seen_messages_for_the_configured_duration()
{
var probe = CreateTestProbe();
var region = Start(probe, false);
var region = Start(probe);

// make sure "1" hasn't seen a message in 3 seconds and passivates
//probe.ExpectNoMsg(TimeUntilPassivate(region, probe));

region.Tell(1);
region.Tell(2);
var responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");
var timeOneSawMessage = responses.Single(r => r.Id == "1").When;

Thread.Sleep(1000);
region.Tell(2);
probe.ExpectMsg<Entity.GotIt>().Id.ShouldBe("2");
Thread.Sleep(1000);
region.Tell(2);
probe.ExpectMsg<Entity.GotIt>().Id.ShouldBe("2");

var timeSinceOneSawAMessage = DateTime.Now.Ticks - timeOneSawMessage;
probe.ExpectNoMsg(settings.PassivateIdleEntityAfter - TimeSpan.FromTicks(timeSinceOneSawAMessage) - smallTolerance);

probe.ExpectMsg("1 passivating");
probe.ExpectNoMsg(TimeUntilPassivate(region, probe));

// but it can be re activated
region.Tell(1);
region.Tell(2);
responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");
}
}

public class InactivePersistentEntityPassivationSpec : AbstractInactiveEntityPassivationSpec
{
public InactivePersistentEntityPassivationSpec()
: base(ConfigurationFactory.ParseString(@"akka.cluster.sharding.passivate-idle-entity-after = 3s"))
{
}

[Fact]
public void Passivation_of_inactive_persistent_entities_must_passivate_entities_when_they_have_not_seen_messages_for_the_configured_duration()
{
var probe = CreateTestProbe();
var region = Start(probe, true);

region.Tell(1);
region.Tell(2);
var responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");

Thread.Sleep(1500);

region.Tell(1);
probe.ExpectMsg<Entity.GotIt>(m => m.Id == "1");

Thread.Sleep(1500);

region.Tell(1);

probe.ExpectMsgAllOf<object>(
new Entity.GotIt("1", null, 0),
"2 passivating"
);

Thread.Sleep(1300);

region.Tell(1);
probe.ExpectMsg<Entity.GotIt>(m => m.Id == "1");

probe.ExpectMsg("1 passivating", TimeSpan.FromSeconds(5));

// but it can be re activated
region.Tell(1);
region.Tell(2);
responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");
}
}

Expand All @@ -267,7 +189,7 @@ public DisabledInactiveEntityPassivationSpec()
public void Passivation_of_inactive_entities_must_not_passivate_when_passivation_is_disabled()
{
var probe = CreateTestProbe();
var region = Start(probe, false);
var region = Start(probe);
probe.ExpectNoMsg(TimeUntilPassivate(region, probe));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public sealed class ClusterShardingSettings : INoSerializationVerificationNeeded
/// Passivate entities that have not received any message in this interval.
/// Note that only messages sent through sharding are counted, so direct messages
/// to the <see cref="IActorRef"/> of the actor or messages that it sends to itself are not counted as activity.
/// Use 0 to disable automatic passivation.
/// Use 0 to disable automatic passivation. It is always disabled if `RememberEntities` is enabled.
/// </summary>
public readonly TimeSpan PassivateIdleEntityAfter;

Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public DDataShard(
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self)
: null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public PersistentShard(
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self)
: null;
}
Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public Shard(
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, PassivateIdleTick.Instance, Self)
: null;

Expand Down
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ protected object RegistrationMessage
protected override void PreStart()
{
Cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent));
if (Settings.PassivateIdleEntityAfter > TimeSpan.Zero)
if (Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities)
{
Log.Info($"Idle entities will be passivated after [{Settings.PassivateIdleEntityAfter}]");
}
Expand Down
1 change: 1 addition & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ akka.cluster.sharding {

# Set this to a time duration to have sharding passivate entities when they have not
# gotten any message in this long time. Set to 'off' to disable.
# It is always disabled if `remember-entities` is enabled.
passivate-idle-entity-after = 120s

# If the coordinator can't store state changes it will be stopped
Expand Down

0 comments on commit 754e6b8

Please sign in to comment.