Skip to content

Commit

Permalink
Handle GetShardHome after rebalance (akkadotnet#3271)
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath authored and Aaronontheweb committed Jan 16, 2018
1 parent fd15e7b commit 881019e
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal static Props Props(string typeName, ClusterShardingSettings settings, I
IActorRef IShardCoordinator.Sender => Sender;
public ILoggingAdapter Log { get; }
public ImmutableDictionary<string, ICancelable> UnAckedHostShards { get; set; } = ImmutableDictionary<string, ICancelable>.Empty;
public ImmutableHashSet<string> RebalanceInProgress { get; set; } = ImmutableHashSet<string>.Empty;
public ImmutableDictionary<string, ImmutableHashSet<IActorRef>> RebalanceInProgress { get; set; } = ImmutableDictionary<string, ImmutableHashSet<IActorRef>>.Empty;
public ImmutableHashSet<IActorRef> GracefullShutdownInProgress { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableHashSet<IActorRef> AliveRegions { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableHashSet<IActorRef> RegionTerminationInProgress { get; set; } = ImmutableHashSet<IActorRef>.Empty;
Expand Down Expand Up @@ -189,10 +189,7 @@ private bool WaitingForStateInitialized(object message)
}
}

private Receive WaitingForUpdate<TEvent>(TEvent e, Action<TEvent> afterUpdateCallback,
ImmutableHashSet<IKey<IReplicatedData>> remainingKeys)
where TEvent : PersistentShardCoordinator.IDomainEvent
=> message =>
private Receive WaitingForUpdate<TEvent>(TEvent e, Action<TEvent> afterUpdateCallback, ImmutableHashSet<IKey<IReplicatedData>> remainingKeys) where TEvent : PersistentShardCoordinator.IDomainEvent => message =>
{
switch (message)
{
Expand Down Expand Up @@ -229,6 +226,11 @@ private Receive WaitingForUpdate<TEvent>(TEvent e, Action<TEvent> afterUpdateCal
ExceptionDispatchInfo.Capture(failure.Cause).Throw();
return true;

case PersistentShardCoordinator.GetShardHome getShardHome:
if (!this.HandleGetShardHome(getShardHome))
Stash.Stash();
return true;

default:
Stash.Stash();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ internal static Props Props(string typeName, ClusterShardingSettings settings, I

public ILoggingAdapter Log { get; }
public ImmutableDictionary<string, ICancelable> UnAckedHostShards { get; set; } = ImmutableDictionary<string, ICancelable>.Empty;
public ImmutableHashSet<string> RebalanceInProgress { get; set; } = ImmutableHashSet<string>.Empty;
public ImmutableDictionary<string, ImmutableHashSet<IActorRef>> RebalanceInProgress { get; set; } = ImmutableDictionary<string, ImmutableHashSet<IActorRef>>.Empty;
// regions that have requested handoff, for graceful shutdown
public ImmutableHashSet<IActorRef> GracefullShutdownInProgress { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableHashSet<IActorRef> AliveRegions { get; set; } = ImmutableHashSet<IActorRef>.Empty;
Expand Down
117 changes: 82 additions & 35 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
namespace Akka.Cluster.Sharding
{
using ShardId = String;

internal interface IShardCoordinator
{
PersistentShardCoordinator.State CurrentState { get; set; }
Expand All @@ -28,7 +28,7 @@ internal interface IShardCoordinator
IActorRef Sender { get; }
ILoggingAdapter Log { get; }
ImmutableDictionary<string, ICancelable> UnAckedHostShards { get; set; }
ImmutableHashSet<string> RebalanceInProgress { get; set; }
ImmutableDictionary<string, ImmutableHashSet<IActorRef>> RebalanceInProgress { get; set; }
// regions that have requested handoff, for graceful shutdown
ImmutableHashSet<IActorRef> GracefullShutdownInProgress { get; set; }
ImmutableHashSet<IActorRef> AliveRegions { get; set; }
Expand Down Expand Up @@ -59,14 +59,36 @@ internal static bool Active<TCoordinator>(this TCoordinator coordinator, object
switch (message)
{
case PersistentShardCoordinator.Register msg: HandleRegister(coordinator, msg); return true;
case PersistentShardCoordinator.RegisterProxy msg:HandleRegisterProxy(coordinator, msg); return true;
case PersistentShardCoordinator.GetShardHome msg: HandleGetShardHome(coordinator, msg); return true;
case PersistentShardCoordinator.RegisterProxy msg: HandleRegisterProxy(coordinator, msg); return true;
case PersistentShardCoordinator.GetShardHome msg:
{
if (!HandleGetShardHome(coordinator, msg))
{
var shard = msg.Shard;
// location not known, yet
var activeRegions = coordinator.CurrentState.Regions.RemoveRange(coordinator.GracefullShutdownInProgress);
if (activeRegions.Count != 0)
{
var getShardHomeSender = coordinator.Sender;
var regionTask = coordinator.AllocationStrategy.AllocateShard(getShardHomeSender, shard, activeRegions);

// if task completed immediately, just continue
if (regionTask.IsCompleted && !regionTask.IsFaulted)
ContinueGetShardHome(coordinator, shard, regionTask.Result, getShardHomeSender);
else
regionTask.PipeTo(coordinator.Self,
success: region => new PersistentShardCoordinator.AllocateShardResult(shard, region, getShardHomeSender),
failure: _ => new PersistentShardCoordinator.AllocateShardResult(shard, null, getShardHomeSender));
}
}
return true;
}
case PersistentShardCoordinator.AllocateShardResult msg: HandleAllocateShardResult(coordinator, msg); return true;
case PersistentShardCoordinator.ShardStarted msg: HandleShardStated(coordinator, msg); return true;
case ResendShardHost msg: HandleResendShardHost(coordinator, msg); return true;
case RebalanceTick _: HandleRebalanceTick(coordinator); return true;
case PersistentShardCoordinator.RebalanceResult msg: ContinueRebalance(coordinator, msg.Shards); return true;
case RebalanceDone msg: HandleRebalanceDone(coordinator, msg); return true;
case RebalanceDone msg: HandleRebalanceDone(coordinator, msg.Shard, msg.Ok); return true;
case PersistentShardCoordinator.GracefulShutdownRequest msg: HandleGracefulShutdownRequest(coordinator, msg); return true;
case GetClusterShardingStats msg: HandleGetClusterShardingStats(coordinator, msg); return true;
case PersistentShardCoordinator.ShardHome _:
Expand Down Expand Up @@ -108,7 +130,7 @@ private static void SendHostShardMessage<TCoordinator>(this TCoordinator coordin
region.Tell(new PersistentShardCoordinator.HostShard(shard));
var cancel = coordinator.Context.System.Scheduler.ScheduleTellOnceCancelable(
coordinator.Settings.TunningParameters.ShardStartTimeout,
coordinator.Self,
coordinator.Self,
new ResendShardHost(shard, region),
coordinator.Self);
coordinator.UnAckedHostShards = coordinator.UnAckedHostShards.SetItem(shard, cancel);
Expand Down Expand Up @@ -199,32 +221,61 @@ private static void HandleGracefulShutdownRequest<TCoordinator>(this TCoordinato
}
}

private static void HandleRebalanceDone<TCoordinator>(this TCoordinator coordinator, RebalanceDone done) where TCoordinator : IShardCoordinator
private static void HandleRebalanceDone<TCoordinator>(this TCoordinator coordinator, string shard, bool ok) where TCoordinator : IShardCoordinator
{
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(done.Shard);
coordinator.Log.Debug("Rebalance shard [{0}] done [{1}]", done.Shard, done.Ok);
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(shard);
coordinator.Log.Debug("Rebalance shard [{0}] done [{1}]", shard, ok);

// The shard could have been removed by ShardRegionTerminated
if (coordinator.CurrentState.Shards.TryGetValue(done.Shard, out var region))
if (coordinator.CurrentState.Shards.TryGetValue(shard, out var region))
{
if (done.Ok)
coordinator.Update(new PersistentShardCoordinator.ShardHomeDeallocated(done.Shard), e =>
if (ok)
coordinator.Update(new PersistentShardCoordinator.ShardHomeDeallocated(shard), e =>
{
coordinator.CurrentState = coordinator.CurrentState.Updated(e);
coordinator.Log.Debug("Shard [{0}] deallocated", e.Shard);
coordinator.ClearRebalanceInProgress(shard);
AllocateShardHomesForRememberEntities(coordinator);
});
else
{
// rebalance not completed, graceful shutdown will be retried
coordinator.GracefullShutdownInProgress = coordinator.GracefullShutdownInProgress.Remove(region);
coordinator.ClearRebalanceInProgress(shard);
}
}
else
{
coordinator.ClearRebalanceInProgress(shard);
}
}

private static void ClearRebalanceInProgress<TCoordinator>(this TCoordinator coordinator, string shard) where TCoordinator : IShardCoordinator
{
if (coordinator.RebalanceInProgress.TryGetValue(shard, out var pendingGetShardHome))
{
var msg = new PersistentShardCoordinator.GetShardHome(shard);
foreach (var sender in pendingGetShardHome)
{
coordinator.Self.Tell(msg, sender);
}
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(shard);
}
}

private static void DeferGetShardHomeRequest<TCoordinator>(this TCoordinator coordinator, string shard, IActorRef from) where TCoordinator : IShardCoordinator
{
coordinator.Log.Debug("GetShardHome [{1}] request from [{2}] deferred, because rebalance is in progress for this shard. It will be handled when rebalance is done.", shard, from);
var pending = coordinator.RebalanceInProgress.TryGetValue(shard, out var prev)
? prev
: ImmutableHashSet<IActorRef>.Empty;
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.SetItem(shard, pending.Add(from));
}

private static void HandleRebalanceTick<TCoordinator>(this TCoordinator coordinator) where TCoordinator : IShardCoordinator
{
if (coordinator.CurrentState.Regions.Count != 0)
{
var shardsTask = coordinator.AllocationStrategy.Rebalance(coordinator.CurrentState.Regions, coordinator.RebalanceInProgress);
var shardsTask = coordinator.AllocationStrategy.Rebalance(coordinator.CurrentState.Regions, coordinator.RebalanceInProgress.Keys.ToImmutableHashSet());
if (shardsTask.IsCompleted && !shardsTask.IsFaulted)
ContinueRebalance(coordinator, shardsTask.Result);
else
Expand Down Expand Up @@ -259,17 +310,19 @@ private static void HandleAllocateShardResult<TCoordinator>(this TCoordinator co
ContinueGetShardHome(coordinator, allocateResult.Shard, allocateResult.ShardRegion, allocateResult.GetShardHomeSender);
}

private static void HandleGetShardHome<TCoordinator>(this TCoordinator coordinator, PersistentShardCoordinator.GetShardHome getShardHome) where TCoordinator : IShardCoordinator
internal static bool HandleGetShardHome<TCoordinator>(this TCoordinator coordinator, PersistentShardCoordinator.GetShardHome getShardHome) where TCoordinator : IShardCoordinator
{
var shard = getShardHome.Shard;

if (coordinator.RebalanceInProgress.Contains(shard))
if (coordinator.RebalanceInProgress.ContainsKey(shard))
{
coordinator.Log.Debug("GetShardHome [{0}] request ignored, because rebalance is in progress for this shard.", shard);
coordinator.DeferGetShardHomeRequest(shard, coordinator.Sender);
return true;
}
else if (!coordinator.HasAllRegionsRegistered())
{
coordinator.Log.Debug("GetShardHome [{0}] request ignored, because not all regions have registered yet.", shard);
return true;
}
else
{
Expand All @@ -279,24 +332,12 @@ private static void HandleGetShardHome<TCoordinator>(this TCoordinator coordinat
coordinator.Log.Debug("GetShardHome [{0}] request ignored, due to region [{1}] termination in progress.", shard, region);
else
coordinator.Sender.Tell(new PersistentShardCoordinator.ShardHome(shard, region));

return true;
}
else
{
var activeRegions = coordinator.CurrentState.Regions.RemoveRange(coordinator.GracefullShutdownInProgress);
if (activeRegions.Count != 0)
{
var getShardHomeSender = coordinator.Sender;
var regionTask = coordinator.AllocationStrategy.AllocateShard(getShardHomeSender, shard, activeRegions);

// if task completed immediately, just continue
if (regionTask.IsCompleted && !regionTask.IsFaulted)
ContinueGetShardHome(coordinator, shard, regionTask.Result, getShardHomeSender);
else
regionTask.ContinueWith(t => !(t.IsFaulted || t.IsCanceled)
? new PersistentShardCoordinator.AllocateShardResult(shard, t.Result, getShardHomeSender)
: new PersistentShardCoordinator.AllocateShardResult(shard, null, getShardHomeSender), TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(coordinator.Self);
}
return false;
}
}
}
Expand All @@ -306,6 +347,8 @@ private static void RegionTerminated<TCoordinator>(this TCoordinator coordinator
if (coordinator.CurrentState.Regions.TryGetValue(terminatedRef, out var shards))
{
coordinator.Log.Debug("ShardRegion terminated: [{0}]", terminatedRef);
coordinator.RegionTerminationInProgress = coordinator.RegionTerminationInProgress.Add(terminatedRef);

foreach (var shard in shards)
coordinator.Self.Tell(new PersistentShardCoordinator.GetShardHome(shard));

Expand Down Expand Up @@ -414,11 +457,11 @@ private static void ContinueRebalance<TCoordinator>(this TCoordinator coordinato
{
foreach (var shard in shards)
{
if (!coordinator.RebalanceInProgress.Contains(shard))
if (!coordinator.RebalanceInProgress.ContainsKey(shard))
{
if (coordinator.CurrentState.Shards.TryGetValue(shard, out var rebalanceFromRegion))
{
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Add(shard);
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.SetItem(shard, ImmutableHashSet<IActorRef>.Empty);
coordinator.Log.Debug("Rebalance shard [{0}] from [{1}]", shard, rebalanceFromRegion);

var regions = coordinator.CurrentState.Regions.Keys.Union(coordinator.CurrentState.RegionProxies);
Expand All @@ -433,7 +476,7 @@ private static void ContinueRebalance<TCoordinator>(this TCoordinator coordinato

private static void ContinueGetShardHome<TCoordinator>(this TCoordinator coordinator, string shard, IActorRef region, IActorRef getShardHomeSender) where TCoordinator : IShardCoordinator
{
if (!coordinator.RebalanceInProgress.Contains(shard))
if (!coordinator.RebalanceInProgress.ContainsKey(shard))
{
if (coordinator.CurrentState.Shards.TryGetValue(shard, out var aref))
{
Expand All @@ -456,6 +499,10 @@ private static void ContinueGetShardHome<TCoordinator>(this TCoordinator coordin
coordinator.Log.Debug("Allocated region {0} for shard [{1}] is not (any longer) one of the registered regions", region, shard);
}
}
else
{
coordinator.DeferGetShardHomeRequest(shard, getShardHomeSender);
}
}

#endregion
Expand Down

0 comments on commit 881019e

Please sign in to comment.