Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle GetShardHome after rebalance #3271

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal static Props Props(string typeName, ClusterShardingSettings settings, I
IActorRef IShardCoordinator.Sender => Sender;
public ILoggingAdapter Log { get; }
public ImmutableDictionary<string, ICancelable> UnAckedHostShards { get; set; } = ImmutableDictionary<string, ICancelable>.Empty;
public ImmutableHashSet<string> RebalanceInProgress { get; set; } = ImmutableHashSet<string>.Empty;
public ImmutableDictionary<string, ImmutableHashSet<IActorRef>> RebalanceInProgress { get; set; } = ImmutableDictionary<string, ImmutableHashSet<IActorRef>>.Empty;
public ImmutableHashSet<IActorRef> GracefullShutdownInProgress { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableHashSet<IActorRef> AliveRegions { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableHashSet<IActorRef> RegionTerminationInProgress { get; set; } = ImmutableHashSet<IActorRef>.Empty;
Expand Down Expand Up @@ -189,10 +189,7 @@ private bool WaitingForStateInitialized(object message)
}
}

private Receive WaitingForUpdate<TEvent>(TEvent e, Action<TEvent> afterUpdateCallback,
ImmutableHashSet<IKey<IReplicatedData>> remainingKeys)
where TEvent : PersistentShardCoordinator.IDomainEvent
=> message =>
private Receive WaitingForUpdate<TEvent>(TEvent e, Action<TEvent> afterUpdateCallback, ImmutableHashSet<IKey<IReplicatedData>> remainingKeys) where TEvent : PersistentShardCoordinator.IDomainEvent => message =>
{
switch (message)
{
Expand Down Expand Up @@ -229,6 +226,11 @@ private Receive WaitingForUpdate<TEvent>(TEvent e, Action<TEvent> afterUpdateCal
ExceptionDispatchInfo.Capture(failure.Cause).Throw();
return true;

case PersistentShardCoordinator.GetShardHome getShardHome:
if (!this.HandleGetShardHome(getShardHome))
Stash.Stash();
return true;

default:
Stash.Stash();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ internal static Props Props(string typeName, ClusterShardingSettings settings, I

public ILoggingAdapter Log { get; }
public ImmutableDictionary<string, ICancelable> UnAckedHostShards { get; set; } = ImmutableDictionary<string, ICancelable>.Empty;
public ImmutableHashSet<string> RebalanceInProgress { get; set; } = ImmutableHashSet<string>.Empty;
public ImmutableDictionary<string, ImmutableHashSet<IActorRef>> RebalanceInProgress { get; set; } = ImmutableDictionary<string, ImmutableHashSet<IActorRef>>.Empty;
// regions that have requested handoff, for graceful shutdown
public ImmutableHashSet<IActorRef> GracefullShutdownInProgress { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableHashSet<IActorRef> AliveRegions { get; set; } = ImmutableHashSet<IActorRef>.Empty;
Expand Down
117 changes: 82 additions & 35 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
namespace Akka.Cluster.Sharding
{
using ShardId = String;

internal interface IShardCoordinator
{
PersistentShardCoordinator.State CurrentState { get; set; }
Expand All @@ -28,7 +28,7 @@ internal interface IShardCoordinator
IActorRef Sender { get; }
ILoggingAdapter Log { get; }
ImmutableDictionary<string, ICancelable> UnAckedHostShards { get; set; }
ImmutableHashSet<string> RebalanceInProgress { get; set; }
ImmutableDictionary<string, ImmutableHashSet<IActorRef>> RebalanceInProgress { get; set; }
// regions that have requested handoff, for graceful shutdown
ImmutableHashSet<IActorRef> GracefullShutdownInProgress { get; set; }
ImmutableHashSet<IActorRef> AliveRegions { get; set; }
Expand Down Expand Up @@ -59,14 +59,36 @@ internal static bool Active<TCoordinator>(this TCoordinator coordinator, object
switch (message)
{
case PersistentShardCoordinator.Register msg: HandleRegister(coordinator, msg); return true;
case PersistentShardCoordinator.RegisterProxy msg:HandleRegisterProxy(coordinator, msg); return true;
case PersistentShardCoordinator.GetShardHome msg: HandleGetShardHome(coordinator, msg); return true;
case PersistentShardCoordinator.RegisterProxy msg: HandleRegisterProxy(coordinator, msg); return true;
case PersistentShardCoordinator.GetShardHome msg:
{
if (!HandleGetShardHome(coordinator, msg))
{
var shard = msg.Shard;
// location not known, yet
var activeRegions = coordinator.CurrentState.Regions.RemoveRange(coordinator.GracefullShutdownInProgress);
if (activeRegions.Count != 0)
{
var getShardHomeSender = coordinator.Sender;
var regionTask = coordinator.AllocationStrategy.AllocateShard(getShardHomeSender, shard, activeRegions);

// if task completed immediately, just continue
if (regionTask.IsCompleted && !regionTask.IsFaulted)
ContinueGetShardHome(coordinator, shard, regionTask.Result, getShardHomeSender);
else
regionTask.PipeTo(coordinator.Self,
success: region => new PersistentShardCoordinator.AllocateShardResult(shard, region, getShardHomeSender),
failure: _ => new PersistentShardCoordinator.AllocateShardResult(shard, null, getShardHomeSender));
}
}
return true;
}
case PersistentShardCoordinator.AllocateShardResult msg: HandleAllocateShardResult(coordinator, msg); return true;
case PersistentShardCoordinator.ShardStarted msg: HandleShardStated(coordinator, msg); return true;
case ResendShardHost msg: HandleResendShardHost(coordinator, msg); return true;
case RebalanceTick _: HandleRebalanceTick(coordinator); return true;
case PersistentShardCoordinator.RebalanceResult msg: ContinueRebalance(coordinator, msg.Shards); return true;
case RebalanceDone msg: HandleRebalanceDone(coordinator, msg); return true;
case RebalanceDone msg: HandleRebalanceDone(coordinator, msg.Shard, msg.Ok); return true;
case PersistentShardCoordinator.GracefulShutdownRequest msg: HandleGracefulShutdownRequest(coordinator, msg); return true;
case GetClusterShardingStats msg: HandleGetClusterShardingStats(coordinator, msg); return true;
case PersistentShardCoordinator.ShardHome _:
Expand Down Expand Up @@ -108,7 +130,7 @@ private static void SendHostShardMessage<TCoordinator>(this TCoordinator coordin
region.Tell(new PersistentShardCoordinator.HostShard(shard));
var cancel = coordinator.Context.System.Scheduler.ScheduleTellOnceCancelable(
coordinator.Settings.TunningParameters.ShardStartTimeout,
coordinator.Self,
coordinator.Self,
new ResendShardHost(shard, region),
coordinator.Self);
coordinator.UnAckedHostShards = coordinator.UnAckedHostShards.SetItem(shard, cancel);
Expand Down Expand Up @@ -199,32 +221,61 @@ private static void HandleGracefulShutdownRequest<TCoordinator>(this TCoordinato
}
}

private static void HandleRebalanceDone<TCoordinator>(this TCoordinator coordinator, RebalanceDone done) where TCoordinator : IShardCoordinator
private static void HandleRebalanceDone<TCoordinator>(this TCoordinator coordinator, string shard, bool ok) where TCoordinator : IShardCoordinator
{
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(done.Shard);
coordinator.Log.Debug("Rebalance shard [{0}] done [{1}]", done.Shard, done.Ok);
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(shard);
coordinator.Log.Debug("Rebalance shard [{0}] done [{1}]", shard, ok);

// The shard could have been removed by ShardRegionTerminated
if (coordinator.CurrentState.Shards.TryGetValue(done.Shard, out var region))
if (coordinator.CurrentState.Shards.TryGetValue(shard, out var region))
{
if (done.Ok)
coordinator.Update(new PersistentShardCoordinator.ShardHomeDeallocated(done.Shard), e =>
if (ok)
coordinator.Update(new PersistentShardCoordinator.ShardHomeDeallocated(shard), e =>
{
coordinator.CurrentState = coordinator.CurrentState.Updated(e);
coordinator.Log.Debug("Shard [{0}] deallocated", e.Shard);
coordinator.ClearRebalanceInProgress(shard);
AllocateShardHomesForRememberEntities(coordinator);
});
else
{
// rebalance not completed, graceful shutdown will be retried
coordinator.GracefullShutdownInProgress = coordinator.GracefullShutdownInProgress.Remove(region);
coordinator.ClearRebalanceInProgress(shard);
}
}
else
{
coordinator.ClearRebalanceInProgress(shard);
}
}

private static void ClearRebalanceInProgress<TCoordinator>(this TCoordinator coordinator, string shard) where TCoordinator : IShardCoordinator
{
if (coordinator.RebalanceInProgress.TryGetValue(shard, out var pendingGetShardHome))
{
var msg = new PersistentShardCoordinator.GetShardHome(shard);
foreach (var sender in pendingGetShardHome)
{
coordinator.Self.Tell(msg, sender);
}
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Remove(shard);
}
}

private static void DeferGetShardHomeRequest<TCoordinator>(this TCoordinator coordinator, string shard, IActorRef from) where TCoordinator : IShardCoordinator
{
coordinator.Log.Debug("GetShardHome [{1}] request from [{2}] deferred, because rebalance is in progress for this shard. It will be handled when rebalance is done.", shard, from);
var pending = coordinator.RebalanceInProgress.TryGetValue(shard, out var prev)
? prev
: ImmutableHashSet<IActorRef>.Empty;
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.SetItem(shard, pending.Add(from));
}

private static void HandleRebalanceTick<TCoordinator>(this TCoordinator coordinator) where TCoordinator : IShardCoordinator
{
if (coordinator.CurrentState.Regions.Count != 0)
{
var shardsTask = coordinator.AllocationStrategy.Rebalance(coordinator.CurrentState.Regions, coordinator.RebalanceInProgress);
var shardsTask = coordinator.AllocationStrategy.Rebalance(coordinator.CurrentState.Regions, coordinator.RebalanceInProgress.Keys.ToImmutableHashSet());
if (shardsTask.IsCompleted && !shardsTask.IsFaulted)
ContinueRebalance(coordinator, shardsTask.Result);
else
Expand Down Expand Up @@ -259,17 +310,19 @@ private static void HandleAllocateShardResult<TCoordinator>(this TCoordinator co
ContinueGetShardHome(coordinator, allocateResult.Shard, allocateResult.ShardRegion, allocateResult.GetShardHomeSender);
}

private static void HandleGetShardHome<TCoordinator>(this TCoordinator coordinator, PersistentShardCoordinator.GetShardHome getShardHome) where TCoordinator : IShardCoordinator
internal static bool HandleGetShardHome<TCoordinator>(this TCoordinator coordinator, PersistentShardCoordinator.GetShardHome getShardHome) where TCoordinator : IShardCoordinator
{
var shard = getShardHome.Shard;

if (coordinator.RebalanceInProgress.Contains(shard))
if (coordinator.RebalanceInProgress.ContainsKey(shard))
{
coordinator.Log.Debug("GetShardHome [{0}] request ignored, because rebalance is in progress for this shard.", shard);
coordinator.DeferGetShardHomeRequest(shard, coordinator.Sender);
return true;
}
else if (!coordinator.HasAllRegionsRegistered())
{
coordinator.Log.Debug("GetShardHome [{0}] request ignored, because not all regions have registered yet.", shard);
return true;
}
else
{
Expand All @@ -279,24 +332,12 @@ private static void HandleGetShardHome<TCoordinator>(this TCoordinator coordinat
coordinator.Log.Debug("GetShardHome [{0}] request ignored, due to region [{1}] termination in progress.", shard, region);
else
coordinator.Sender.Tell(new PersistentShardCoordinator.ShardHome(shard, region));

return true;
}
else
{
var activeRegions = coordinator.CurrentState.Regions.RemoveRange(coordinator.GracefullShutdownInProgress);
if (activeRegions.Count != 0)
{
var getShardHomeSender = coordinator.Sender;
var regionTask = coordinator.AllocationStrategy.AllocateShard(getShardHomeSender, shard, activeRegions);

// if task completed immediately, just continue
if (regionTask.IsCompleted && !regionTask.IsFaulted)
ContinueGetShardHome(coordinator, shard, regionTask.Result, getShardHomeSender);
else
regionTask.ContinueWith(t => !(t.IsFaulted || t.IsCanceled)
? new PersistentShardCoordinator.AllocateShardResult(shard, t.Result, getShardHomeSender)
: new PersistentShardCoordinator.AllocateShardResult(shard, null, getShardHomeSender), TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(coordinator.Self);
}
return false;
}
}
}
Expand All @@ -306,6 +347,8 @@ private static void RegionTerminated<TCoordinator>(this TCoordinator coordinator
if (coordinator.CurrentState.Regions.TryGetValue(terminatedRef, out var shards))
{
coordinator.Log.Debug("ShardRegion terminated: [{0}]", terminatedRef);
coordinator.RegionTerminationInProgress = coordinator.RegionTerminationInProgress.Add(terminatedRef);

foreach (var shard in shards)
coordinator.Self.Tell(new PersistentShardCoordinator.GetShardHome(shard));

Expand Down Expand Up @@ -414,11 +457,11 @@ private static void ContinueRebalance<TCoordinator>(this TCoordinator coordinato
{
foreach (var shard in shards)
{
if (!coordinator.RebalanceInProgress.Contains(shard))
if (!coordinator.RebalanceInProgress.ContainsKey(shard))
{
if (coordinator.CurrentState.Shards.TryGetValue(shard, out var rebalanceFromRegion))
{
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.Add(shard);
coordinator.RebalanceInProgress = coordinator.RebalanceInProgress.SetItem(shard, ImmutableHashSet<IActorRef>.Empty);
coordinator.Log.Debug("Rebalance shard [{0}] from [{1}]", shard, rebalanceFromRegion);

var regions = coordinator.CurrentState.Regions.Keys.Union(coordinator.CurrentState.RegionProxies);
Expand All @@ -433,7 +476,7 @@ private static void ContinueRebalance<TCoordinator>(this TCoordinator coordinato

private static void ContinueGetShardHome<TCoordinator>(this TCoordinator coordinator, string shard, IActorRef region, IActorRef getShardHomeSender) where TCoordinator : IShardCoordinator
{
if (!coordinator.RebalanceInProgress.Contains(shard))
if (!coordinator.RebalanceInProgress.ContainsKey(shard))
{
if (coordinator.CurrentState.Shards.TryGetValue(shard, out var aref))
{
Expand All @@ -456,6 +499,10 @@ private static void ContinueGetShardHome<TCoordinator>(this TCoordinator coordin
coordinator.Log.Debug("Allocated region {0} for shard [{1}] is not (any longer) one of the registered regions", region, shard);
}
}
else
{
coordinator.DeferGetShardHomeRequest(shard, getShardHomeSender);
}
}

#endregion
Expand Down