From a944672fefe8379f2586b7bdf3d70de40a03f2d5 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Sun, 14 Jan 2018 22:01:38 +0100 Subject: [PATCH] Handle GetShardHome after rebalance --- .../DDataShardCoordinator.cs | 12 +- .../PersistentShardCoordinator.cs | 2 +- .../Akka.Cluster.Sharding/ShardCoordinator.cs | 117 ++++++++++++------ 3 files changed, 90 insertions(+), 41 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs index dcc24ceb6be..1a405342272 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShardCoordinator.cs @@ -29,7 +29,7 @@ internal static Props Props(string typeName, ClusterShardingSettings settings, I IActorRef IShardCoordinator.Sender => Sender; public ILoggingAdapter Log { get; } public ImmutableDictionary UnAckedHostShards { get; set; } = ImmutableDictionary.Empty; - public ImmutableHashSet RebalanceInProgress { get; set; } = ImmutableHashSet.Empty; + public ImmutableDictionary> RebalanceInProgress { get; set; } = ImmutableDictionary>.Empty; public ImmutableHashSet GracefullShutdownInProgress { get; set; } = ImmutableHashSet.Empty; public ImmutableHashSet AliveRegions { get; set; } = ImmutableHashSet.Empty; public ImmutableHashSet RegionTerminationInProgress { get; set; } = ImmutableHashSet.Empty; @@ -189,10 +189,7 @@ private bool WaitingForStateInitialized(object message) } } - private Receive WaitingForUpdate(TEvent e, Action afterUpdateCallback, - ImmutableHashSet> remainingKeys) - where TEvent : PersistentShardCoordinator.IDomainEvent - => message => + private Receive WaitingForUpdate(TEvent e, Action afterUpdateCallback, ImmutableHashSet> remainingKeys) where TEvent : PersistentShardCoordinator.IDomainEvent => message => { switch (message) { @@ -229,6 +226,11 @@ private Receive WaitingForUpdate(TEvent e, Action afterUpdateCal ExceptionDispatchInfo.Capture(failure.Cause).Throw(); return true; + case PersistentShardCoordinator.GetShardHome getShardHome: + if (!this.HandleGetShardHome(getShardHome)) + Stash.Stash(); + return true; + default: Stash.Stash(); return true; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 9e2a2226df6..cb876828c05 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -1257,7 +1257,7 @@ internal static Props Props(string typeName, ClusterShardingSettings settings, I public ILoggingAdapter Log { get; } public ImmutableDictionary UnAckedHostShards { get; set; } = ImmutableDictionary.Empty; - public ImmutableHashSet RebalanceInProgress { get; set; } = ImmutableHashSet.Empty; + public ImmutableDictionary> RebalanceInProgress { get; set; } = ImmutableDictionary>.Empty; // regions that have requested handoff, for graceful shutdown public ImmutableHashSet GracefullShutdownInProgress { get; set; } = ImmutableHashSet.Empty; public ImmutableHashSet AliveRegions { get; set; } = ImmutableHashSet.Empty; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs index 170f8ca17bb..1aaf953ce54 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs @@ -15,7 +15,7 @@ namespace Akka.Cluster.Sharding { using ShardId = String; - + internal interface IShardCoordinator { PersistentShardCoordinator.State CurrentState { get; set; } @@ -28,7 +28,7 @@ internal interface IShardCoordinator IActorRef Sender { get; } ILoggingAdapter Log { get; } ImmutableDictionary UnAckedHostShards { get; set; } - ImmutableHashSet RebalanceInProgress { get; set; } + ImmutableDictionary> RebalanceInProgress { get; set; } // regions that have requested handoff, for graceful shutdown ImmutableHashSet GracefullShutdownInProgress { get; set; } ImmutableHashSet AliveRegions { get; set; } @@ -59,14 +59,36 @@ internal static bool Active(this TCoordinator coordinator, object switch (message) { case PersistentShardCoordinator.Register msg: HandleRegister(coordinator, msg); return true; - case PersistentShardCoordinator.RegisterProxy msg:HandleRegisterProxy(coordinator, msg); return true; - case PersistentShardCoordinator.GetShardHome msg: HandleGetShardHome(coordinator, msg); return true; + case PersistentShardCoordinator.RegisterProxy msg: HandleRegisterProxy(coordinator, msg); return true; + case PersistentShardCoordinator.GetShardHome msg: + { + if (!HandleGetShardHome(coordinator, msg)) + { + var shard = msg.Shard; + // location not known, yet + var activeRegions = coordinator.CurrentState.Regions.RemoveRange(coordinator.GracefullShutdownInProgress); + if (activeRegions.Count != 0) + { + var getShardHomeSender = coordinator.Sender; + var regionTask = coordinator.AllocationStrategy.AllocateShard(getShardHomeSender, shard, activeRegions); + + // if task completed immediately, just continue + if (regionTask.IsCompleted && !regionTask.IsFaulted) + ContinueGetShardHome(coordinator, shard, regionTask.Result, getShardHomeSender); + else + regionTask.PipeTo(coordinator.Self, + success: region => new PersistentShardCoordinator.AllocateShardResult(shard, region, getShardHomeSender), + failure: _ => new PersistentShardCoordinator.AllocateShardResult(shard, null, getShardHomeSender)); + } + } + return true; + } case PersistentShardCoordinator.AllocateShardResult msg: HandleAllocateShardResult(coordinator, msg); return true; case PersistentShardCoordinator.ShardStarted msg: HandleShardStated(coordinator, msg); return true; case ResendShardHost msg: HandleResendShardHost(coordinator, msg); return true; case RebalanceTick _: HandleRebalanceTick(coordinator); return true; case PersistentShardCoordinator.RebalanceResult msg: ContinueRebalance(coordinator, msg.Shards); return true; - case RebalanceDone msg: HandleRebalanceDone(coordinator, msg); return true; + case RebalanceDone msg: HandleRebalanceDone(coordinator, msg.Shard, msg.Ok); return true; case PersistentShardCoordinator.GracefulShutdownRequest msg: HandleGracefulShutdownRequest(coordinator, msg); return true; case GetClusterShardingStats msg: HandleGetClusterShardingStats(coordinator, msg); return true; case PersistentShardCoordinator.ShardHome _: @@ -108,7 +130,7 @@ private static void SendHostShardMessage(this TCoordinator coordin region.Tell(new PersistentShardCoordinator.HostShard(shard)); var cancel = coordinator.Context.System.Scheduler.ScheduleTellOnceCancelable( coordinator.Settings.TunningParameters.ShardStartTimeout, - coordinator.Self, + coordinator.Self, new ResendShardHost(shard, region), coordinator.Self); coordinator.UnAckedHostShards = coordinator.UnAckedHostShards.SetItem(shard, cancel); @@ -199,32 +221,61 @@ private static void HandleGracefulShutdownRequest(this TCoordinato } } - private static void HandleRebalanceDone(this TCoordinator coordinator, RebalanceDone done) where TCoordinator : IShardCoordinator + private static void HandleRebalanceDone(this TCoordinator coordinator, string shard, bool ok) where TCoordinator : IShardCoordinator { - coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(done.Shard); - coordinator.Log.Debug("Rebalance shard [{0}] done [{1}]", done.Shard, done.Ok); + coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(shard); + coordinator.Log.Debug("Rebalance shard [{0}] done [{1}]", shard, ok); // The shard could have been removed by ShardRegionTerminated - if (coordinator.CurrentState.Shards.TryGetValue(done.Shard, out var region)) + if (coordinator.CurrentState.Shards.TryGetValue(shard, out var region)) { - if (done.Ok) - coordinator.Update(new PersistentShardCoordinator.ShardHomeDeallocated(done.Shard), e => + if (ok) + coordinator.Update(new PersistentShardCoordinator.ShardHomeDeallocated(shard), e => { coordinator.CurrentState = coordinator.CurrentState.Updated(e); - coordinator.Log.Debug("Shard [{0}] deallocated", e.Shard); + coordinator.ClearRebalanceInProgress(shard); AllocateShardHomesForRememberEntities(coordinator); }); else + { // rebalance not completed, graceful shutdown will be retried coordinator.GracefullShutdownInProgress = coordinator.GracefullShutdownInProgress.Remove(region); + coordinator.ClearRebalanceInProgress(shard); + } + } + else + { + coordinator.ClearRebalanceInProgress(shard); + } + } + + private static void ClearRebalanceInProgress(this TCoordinator coordinator, string shard) where TCoordinator : IShardCoordinator + { + if (coordinator.RebalanceInProgress.TryGetValue(shard, out var pendingGetShardHome)) + { + var msg = new PersistentShardCoordinator.GetShardHome(shard); + foreach (var sender in pendingGetShardHome) + { + coordinator.Self.Tell(msg, sender); + } + coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(shard); } } + private static void DeferGetShardHomeRequest(this TCoordinator coordinator, string shard, IActorRef from) where TCoordinator : IShardCoordinator + { + coordinator.Log.Debug("GetShardHome [{1}] request from [{2}] deferred, because rebalance is in progress for this shard. It will be handled when rebalance is done.", shard, from); + var pending = coordinator.RebalanceInProgress.TryGetValue(shard, out var prev) + ? prev + : ImmutableHashSet.Empty; + coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.SetItem(shard, pending.Add(from)); + } + private static void HandleRebalanceTick(this TCoordinator coordinator) where TCoordinator : IShardCoordinator { if (coordinator.CurrentState.Regions.Count != 0) { - var shardsTask = coordinator.AllocationStrategy.Rebalance(coordinator.CurrentState.Regions, coordinator.RebalanceInProgress); + var shardsTask = coordinator.AllocationStrategy.Rebalance(coordinator.CurrentState.Regions, coordinator.RebalanceInProgress.Keys.ToImmutableHashSet()); if (shardsTask.IsCompleted && !shardsTask.IsFaulted) ContinueRebalance(coordinator, shardsTask.Result); else @@ -259,17 +310,19 @@ private static void HandleAllocateShardResult(this TCoordinator co ContinueGetShardHome(coordinator, allocateResult.Shard, allocateResult.ShardRegion, allocateResult.GetShardHomeSender); } - private static void HandleGetShardHome(this TCoordinator coordinator, PersistentShardCoordinator.GetShardHome getShardHome) where TCoordinator : IShardCoordinator + internal static bool HandleGetShardHome(this TCoordinator coordinator, PersistentShardCoordinator.GetShardHome getShardHome) where TCoordinator : IShardCoordinator { var shard = getShardHome.Shard; - if (coordinator.RebalanceInProgress.Contains(shard)) + if (coordinator.RebalanceInProgress.ContainsKey(shard)) { - coordinator.Log.Debug("GetShardHome [{0}] request ignored, because rebalance is in progress for this shard.", shard); + coordinator.DeferGetShardHomeRequest(shard, coordinator.Sender); + return true; } else if (!coordinator.HasAllRegionsRegistered()) { coordinator.Log.Debug("GetShardHome [{0}] request ignored, because not all regions have registered yet.", shard); + return true; } else { @@ -279,24 +332,12 @@ private static void HandleGetShardHome(this TCoordinator coordinat coordinator.Log.Debug("GetShardHome [{0}] request ignored, due to region [{1}] termination in progress.", shard, region); else coordinator.Sender.Tell(new PersistentShardCoordinator.ShardHome(shard, region)); + + return true; } else { - var activeRegions = coordinator.CurrentState.Regions.RemoveRange(coordinator.GracefullShutdownInProgress); - if (activeRegions.Count != 0) - { - var getShardHomeSender = coordinator.Sender; - var regionTask = coordinator.AllocationStrategy.AllocateShard(getShardHomeSender, shard, activeRegions); - - // if task completed immediately, just continue - if (regionTask.IsCompleted && !regionTask.IsFaulted) - ContinueGetShardHome(coordinator, shard, regionTask.Result, getShardHomeSender); - else - regionTask.ContinueWith(t => !(t.IsFaulted || t.IsCanceled) - ? new PersistentShardCoordinator.AllocateShardResult(shard, t.Result, getShardHomeSender) - : new PersistentShardCoordinator.AllocateShardResult(shard, null, getShardHomeSender), TaskContinuationOptions.ExecuteSynchronously) - .PipeTo(coordinator.Self); - } + return false; } } } @@ -306,6 +347,8 @@ private static void RegionTerminated(this TCoordinator coordinator if (coordinator.CurrentState.Regions.TryGetValue(terminatedRef, out var shards)) { coordinator.Log.Debug("ShardRegion terminated: [{0}]", terminatedRef); + coordinator.RegionTerminationInProgress = coordinator.RegionTerminationInProgress.Add(terminatedRef); + foreach (var shard in shards) coordinator.Self.Tell(new PersistentShardCoordinator.GetShardHome(shard)); @@ -414,11 +457,11 @@ private static void ContinueRebalance(this TCoordinator coordinato { foreach (var shard in shards) { - if (!coordinator.RebalanceInProgress.Contains(shard)) + if (!coordinator.RebalanceInProgress.ContainsKey(shard)) { if (coordinator.CurrentState.Shards.TryGetValue(shard, out var rebalanceFromRegion)) { - coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Add(shard); + coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.SetItem(shard, ImmutableHashSet.Empty); coordinator.Log.Debug("Rebalance shard [{0}] from [{1}]", shard, rebalanceFromRegion); var regions = coordinator.CurrentState.Regions.Keys.Union(coordinator.CurrentState.RegionProxies); @@ -433,7 +476,7 @@ private static void ContinueRebalance(this TCoordinator coordinato private static void ContinueGetShardHome(this TCoordinator coordinator, string shard, IActorRef region, IActorRef getShardHomeSender) where TCoordinator : IShardCoordinator { - if (!coordinator.RebalanceInProgress.Contains(shard)) + if (!coordinator.RebalanceInProgress.ContainsKey(shard)) { if (coordinator.CurrentState.Shards.TryGetValue(shard, out var aref)) { @@ -456,6 +499,10 @@ private static void ContinueGetShardHome(this TCoordinator coordin coordinator.Log.Debug("Allocated region {0} for shard [{1}] is not (any longer) one of the registered regions", region, shard); } } + else + { + coordinator.DeferGetShardHomeRequest(shard, getShardHomeSender); + } } #endregion