Skip to content

Commit

Permalink
sharding snapshot & journal cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
zbynek001 committed May 25, 2017
1 parent cfb650b commit 77aeb4a
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class TunningParameters
/// <summary>
/// TBD
/// </summary>
public readonly int KeepNrOfBatches;
/// <summary>
/// TBD
/// </summary>
public readonly int LeastShardAllocationRebalanceThreshold;
/// <summary>
/// TBD
Expand All @@ -78,6 +82,7 @@ public class TunningParameters
/// <param name="entityRestartBackoff">TBD</param>
/// <param name="rebalanceInterval">TBD</param>
/// <param name="snapshotAfter">TBD</param>
/// <param name="keepNrOfBatches"TBD></param>
/// <param name="leastShardAllocationRebalanceThreshold">TBD</param>
/// <param name="leastShardAllocationMaxSimultaneousRebalance">TBD</param>
/// <exception cref="ArgumentException">
Expand All @@ -94,6 +99,7 @@ public TunningParameters(
TimeSpan entityRestartBackoff,
TimeSpan rebalanceInterval,
int snapshotAfter,
int keepNrOfBatches,
int leastShardAllocationRebalanceThreshold,
int leastShardAllocationMaxSimultaneousRebalance,
string entityRecoveryStrategy,
Expand All @@ -112,6 +118,7 @@ public TunningParameters(
EntityRestartBackoff = entityRestartBackoff;
RebalanceInterval = rebalanceInterval;
SnapshotAfter = snapshotAfter;
KeepNrOfBatches = keepNrOfBatches;
LeastShardAllocationRebalanceThreshold = leastShardAllocationRebalanceThreshold;
LeastShardAllocationMaxSimultaneousRebalance = leastShardAllocationMaxSimultaneousRebalance;
EntityRecoveryStrategy = entityRecoveryStrategy;
Expand All @@ -133,21 +140,21 @@ public sealed class ClusterShardingSettings : INoSerializationVerificationNeeded
public readonly string Role;

/// <summary>
/// True if active entity actors shall be automatically restarted upon <see cref="Shard"/> restart.i.e.
/// True if active entity actors shall be automatically restarted upon <see cref="Shard"/> restart.i.e.
/// if the <see cref="Shard"/> is started on a different <see cref="ShardRegion"/> due to rebalance or crash.
/// </summary>
public readonly bool RememberEntities;

/// <summary>
/// Absolute path to the journal plugin configuration entity that is to be used for the internal
/// persistence of ClusterSharding.If not defined the default journal plugin is used. Note that
/// Absolute path to the journal plugin configuration entity that is to be used for the internal
/// persistence of ClusterSharding.If not defined the default journal plugin is used. Note that
/// this is not related to persistence used by the entity actors.
/// </summary>
public readonly string JournalPluginId;

/// <summary>
/// Absolute path to the snapshot plugin configuration entity that is to be used for the internal persistence
/// of ClusterSharding. If not defined the default snapshot plugin is used.Note that this is not related
/// Absolute path to the snapshot plugin configuration entity that is to be used for the internal persistence
/// of ClusterSharding. If not defined the default snapshot plugin is used.Note that this is not related
/// to persistence used by the entity actors.
/// </summary>
public readonly string SnapshotPluginId;
Expand Down Expand Up @@ -193,6 +200,7 @@ public static ClusterShardingSettings Create(Config config, Config singletonConf
entityRestartBackoff: config.GetTimeSpan("entity-restart-backoff"),
rebalanceInterval: config.GetTimeSpan("rebalance-interval"),
snapshotAfter: config.GetInt("snapshot-after"),
keepNrOfBatches: config.GetInt("keep-nr-of-batches"),
leastShardAllocationRebalanceThreshold: config.GetInt("least-shard-allocation-strategy.rebalance-threshold"),
leastShardAllocationMaxSimultaneousRebalance: config.GetInt("least-shard-allocation-strategy.max-simultaneous-rebalance"),
entityRecoveryStrategy: config.GetString("entity-recovery-strategy"),
Expand Down
45 changes: 42 additions & 3 deletions src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Akka.Cluster.Sharding
using Msg = Object;

/// <summary>
/// This actor creates children entity actors on demand that it is told to be
/// This actor creates children entity actors on demand that it is told to be
/// responsible for. It is used when `rememberEntities` is enabled.
/// </summary>
public class PersistentShard : Shard
Expand Down Expand Up @@ -69,7 +69,46 @@ public PersistentShard(

protected override bool ReceiveCommand(object message)
{
return HandleCommand(message);
switch (message)
{
case SaveSnapshotSuccess m:
Log.Debug("PersistentShard snapshot saved successfully");
/*
* delete old events but keep the latest around because
*
* it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
* i.e. it will use an older snapshot and then not replay the full sequence of events
*
* for debugging if something goes wrong in production it's very useful to be able to inspect the events
*/
var deleteToSequenceNr = m.Metadata.SequenceNr - Settings.TunningParameters.KeepNrOfBatches * Settings.TunningParameters.SnapshotAfter;
if (deleteToSequenceNr > 0)
{
DeleteMessages(deleteToSequenceNr);
}
break;
case SaveSnapshotFailure m:
Log.Warning("PersistentShard snapshot failure: {}", m.Cause.Message);
break;
case DeleteMessagesSuccess m:
Log.Debug("PersistentShard messages to {} deleted successfully", m.ToSequenceNr);
DeleteSnapshots(new SnapshotSelectionCriteria(m.ToSequenceNr - 1));
break;

case DeleteMessagesFailure m:
Log.Warning("PersistentShard messages to {} deletion failure: {}", m.ToSequenceNr, m.Cause.Message);
break;
case DeleteSnapshotsSuccess m:
Log.Debug("PersistentShard snapshots matching {} deleted successfully", m.Criteria);
break;
case DeleteSnapshotsFailure m:
Log.Warning("PersistentShard snapshots matching {} deletion falure: {}", m.Criteria, m.Cause.Message);
break;
default:
return HandleCommand(message);
}
return true;
}

/// <summary>
Expand Down Expand Up @@ -182,7 +221,7 @@ protected override void DeliverTo(string id, object message, object payload, IAc
else
child.Tell(payload, sender);
}

private void RestartRememberedEntities()
{
RememberedEntitiesRecoveryStrategy.RecoverEntities(State.Entries).ForEach(scheduledRecovery =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,22 +790,59 @@ private bool WaitingForStateInitialized(object message)
else return HandleSnapshotResult(message);
}


private bool HandleSnapshotResult(object message)
{
if (message is SaveSnapshotSuccess) Log.Debug("Persistent snapshot saved successfully");
else if (message is SaveSnapshotFailure) Log.Warning("Persistent snapshot failure: {0}", ((SaveSnapshotFailure)message).Cause.Message);
else return false;
switch (message)
{
case SaveSnapshotSuccess m:
Log.Debug("Persistent snapshot saved successfully");
/*
* delete old events but keep the latest around because
*
* it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
* i.e. it will use an older snapshot and then not replay the full sequence of events
*
* for debugging if something goes wrong in production it's very useful to be able to inspect the events
*/
var deleteToSequenceNr = m.Metadata.SequenceNr - Settings.TunningParameters.KeepNrOfBatches * Settings.TunningParameters.SnapshotAfter;
if (deleteToSequenceNr > 0)
{
DeleteMessages(deleteToSequenceNr);
}
break;

case SaveSnapshotFailure m:
Log.Warning("Persistent snapshot failure: {}", m.Cause.Message);
break;
case DeleteMessagesSuccess m:
Log.Debug("Persistent messages to {} deleted successfully", m.ToSequenceNr);
DeleteSnapshots(new SnapshotSelectionCriteria(m.ToSequenceNr - 1));
break;
case DeleteMessagesFailure m:
Log.Warning("Persistent messages to {} deletion failure: {}", m.ToSequenceNr, m.Cause.Message);
break;
case DeleteSnapshotsSuccess m:
Log.Debug("Persistent snapshots matching {} deleted successfully", m.Criteria);
break;
case DeleteSnapshotsFailure m:
Log.Warning("Persistent snapshots matching {} deletion falure: {}", m.Criteria, m.Cause.Message);
break;
default:
return false;
}
return true;
}

/// <summary>
/// <summary>
/// TBD
/// </summary>
/// <typeparam name="TEvent">TBD</typeparam>
/// <param name="e">TBD</param>
/// <param name="handler">TBD</param>
/// <returns>TBD</returns>
protected void Update<TEvent>(TEvent e, Action<TEvent> handler) where TEvent : IDomainEvent
protected void Update<TEvent>(TEvent e, Action<TEvent> handler) where TEvent : IDomainEvent
{
SaveSnapshotIfNeeded();
Persist(e, handler);
Expand Down
7 changes: 7 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ akka.cluster.sharding {
# events. Snapshots are used to reduce recovery times.
snapshot-after = 1000

# The shard deletes persistent events (messages and snapshots) after doing snapshot
# keeping this number of old persistent batches.
# Batch is of size `snapshot-after`.
# When set to 0 after snapshot is successfully done all messages with equal or lower sequence number will be deleted.
# Default value of 2 leaves last maximum 2*`snapshot-after` messages and 3 snapshots (2 old ones + fresh snapshot)
keep-nr-of-batches = 2

# Setting for the default shard allocation strategy
least-shard-allocation-strategy {
# Threshold of how large the difference between most and least number of
Expand Down

0 comments on commit 77aeb4a

Please sign in to comment.