Skip to content

Commit

Permalink
Zbynek001 sharding update2 (#5857)
Browse files Browse the repository at this point in the history
* Add Dropped to Akka.Actor (migrated partially from akka/akka#27160)
Log Dropped from DeadLetterListener

* Logging of UnhandledMessage (migrated from akka/akka#28414)
* make use of the existing logging of dead letter
  also for UnhandledMessage

Supress ActorSelectionMessage with DeadLetterSuppression (migrated from akka/akka#28341)
* for example the Cluster InitJoin message is marked with DeadLetterSuppression
  but was anyway logged because sent with actorSelection
* for other WrappedMessage than ActorSelectionMessage we shouldn't unwrap and publish
  the inner in SuppressedDeadLetter because that might loose some information
* therefore those are silenced in the DeadLetterListener instead

Better deadLetter logging of wrapped messages (migrated from akka/akka#28253)

* MessageBuffer implementations

* TestKit logger with prefix

* sharding update

* sharding tests

* sharding multinode tests

* api approval

* replace sqlite with MemoryJournalShared and local snapshot store

* tests

* snapshot inmem

* backwards compatible PersistenceId for PersistentShardCoordinator

* test fix

* SnapshotStoreProxy & MemorySnapshotStoreShared

* test snapshot store switched to shared inmem

* ExternalShardAllocationStrategy & tests

* ExternalShardAllocationStrategy API approval

* test timing fix

* review comments addressed

* IEquatable removed for singleton messages

* test fixes

* cleanup

* test cleanup

* protobuf generated

* cleanup

* cleanup

* Race condition in DeprecatedSupervisionSpec fixed (migrated from akka/akka#29914)

* cleanup

* Small clarification of recovery strategy in config (migrated from akka/akka#30167)

* Resolve snapshot check skipped for some events (migrated from akka/akka#30226)

* additional sharding messages serialization, tests

* api approval update

* disable durable storage on ShardRegionSpec

* extend timeout for ExternalShardAllocationSpec

* naming conventions

* missing readonly added, updated syntax

* renaming conventions

* Defer coordinator stop until region graceful stop has completed (migrated from akka/akka#30338)

* sharding: actively signal 'region stopped' to the coordinator (migrated from akka/akka#30402)

* racy test fix

* racy test verbose logging

* test update

* merge fix

* sharding ddata coordinator switch to ReadMajorityPlus/WriteMajorityPlus

* more logs to debug tests

* more logs

* fix MultiNodeClusterSpec default timeouts

* revert additional logs

* override single-expect-default only for sharding tests

* revert unrelated protobuf serializers

* Fix StartEntitySpec instability (migrated from akka/akka#30537)

The old logic allowed a race condition where the 'StartEntity' from the
test arrived at the ShardRegion before the termination of the actor did,
causing it to ignore the `StartEntity`.

* Quieter logging for ShardCoordinator initialization (migrated from akka/akka#30488)

Log the first retry on 'info', then 'warning', and finally 'error'

* reduce default write-majority-plus for sharding (migrated from akka/akka#30328)

* merge fix

* rebalance log fix

* fixed compilation error from rebase

* switch RememberEntitiesShardIdExtractorChangeSpec from ddata to persistence

* disable durable storage on PersistentShardingMigrationSpec

* clean up leveldb configuration

* fix XML-DOC warnings

Co-authored-by: zbynek001 <zbynek001@gmail.com>
Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
3 people authored Apr 20, 2022
1 parent 7b7e0ec commit 2cbdbcb
Show file tree
Hide file tree
Showing 104 changed files with 18,515 additions and 7,998 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using System.Runtime.Serialization;
using Akka.Event;
using Akka.Persistence.Journal;
using Akka.Persistence;
using System.Threading;
using Akka.Util.Internal;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Persistence;
using Akka.Persistence.Journal;

namespace Akka.Cluster.Sharding.Tests
{
Expand Down Expand Up @@ -86,6 +84,12 @@ public SetStore(IActorRef store)
/// </summary>
public abstract class AsyncWriteProxyEx : AsyncWriteJournal, IWithUnboundedStash
{
private class InitTimeout
{
public static readonly InitTimeout Instance = new InitTimeout();
private InitTimeout() { }
}

private bool _isInitialized;
private bool _isInitTimedOut;
private IActorRef _store;
Expand Down Expand Up @@ -278,27 +282,6 @@ private Task<T> StoreNotInitialized<T>()
/// TBD
/// </summary>
public IStash Stash { get; set; }

// sent to self only
/// <summary>
/// TBD
/// </summary>
public class InitTimeout
{
private InitTimeout() { }
private static readonly InitTimeout _instance = new InitTimeout();

/// <summary>
/// TBD
/// </summary>
public static InitTimeout Instance
{
get
{
return _instance;
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -338,7 +321,6 @@ protected override bool Receive(object message)
switch (message)
{
case ReplayedMessage rm:
//rm.Persistent
_replayCallback(rm.Persistent);
return true;
case RecoverySuccess _:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterShardCoordinatorDowning2Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using Akka.Actor;
using Akka.Remote.TestKit;
using Akka.Util;
using FluentAssertions;
using static Akka.Remote.Transport.ThrottleTransportAdapter;

namespace Akka.Cluster.Sharding.Tests
{
public class ClusterShardCoordinatorDowning2SpecConfig : MultiNodeClusterShardingConfig
{
public RoleName First { get; }
public RoleName Second { get; }

public ClusterShardCoordinatorDowning2SpecConfig(StateStoreMode mode)
: base(mode: mode, loglevel: "DEBUG", additionalConfig: @"
akka.cluster.sharding.rebalance-interval = 120 s
# setting down-removal-margin, for testing of issue #29131
akka.cluster.down-removal-margin = 3 s
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
")
{
First = Role("first");
Second = Role("second");

TestTransport = true;
}
}

public class PersistentClusterShardCoordinatorDowning2SpecConfig : ClusterShardCoordinatorDowning2SpecConfig
{
public PersistentClusterShardCoordinatorDowning2SpecConfig()
: base(StateStoreMode.Persistence)
{
}
}

public class DDataClusterShardCoordinatorDowning2SpecConfig : ClusterShardCoordinatorDowning2SpecConfig
{
public DDataClusterShardCoordinatorDowning2SpecConfig()
: base(StateStoreMode.DData)
{
}
}

public class PersistentClusterShardCoordinatorDowning2Spec : ClusterShardCoordinatorDowning2Spec
{
public PersistentClusterShardCoordinatorDowning2Spec()
: base(new PersistentClusterShardCoordinatorDowning2SpecConfig(), typeof(PersistentClusterShardCoordinatorDowning2Spec))
{
}
}

public class DDataClusterShardCoordinatorDowning2Spec : ClusterShardCoordinatorDowning2Spec
{
public DDataClusterShardCoordinatorDowning2Spec()
: base(new DDataClusterShardCoordinatorDowning2SpecConfig(), typeof(DDataClusterShardCoordinatorDowning2Spec))
{
}
}

public abstract class ClusterShardCoordinatorDowning2Spec : MultiNodeClusterShardingSpec<ClusterShardCoordinatorDowning2SpecConfig>
{
#region setup

internal sealed class Ping
{
public readonly string Id;

public Ping(string id)
{
Id = id;
}
}

internal class Entity : ActorBase
{
protected override bool Receive(object message)
{
if (message is Ping)
{
Sender.Tell(Self);
return true;
}
return false;
}
}

internal class GetLocations
{
public static readonly GetLocations Instance = new GetLocations();

private GetLocations()
{
}
}

internal class Locations
{
public Locations(ImmutableDictionary<string, IActorRef> locations)
{
Locs = locations;
}

public ImmutableDictionary<string, IActorRef> Locs { get; }
}

internal class ShardLocations : ActorBase
{
private Locations locations;

public ShardLocations()
{
}

protected override bool Receive(object message)
{
switch (message)
{
case GetLocations _:
Sender.Tell(locations);
return true;
case Locations l:
locations = l;
return true;
}
return false;
}
}

private ExtractEntityId extractEntityId = message =>
{
switch (message)
{
case Ping p:
return (p.Id, message);
}
return Option<(string, object)>.None;
};

internal ExtractShardId extractShardId = message =>
{
switch (message)
{
case Ping p:
return p.Id[0].ToString();
}
return null;
};

private readonly Lazy<IActorRef> _region;

protected ClusterShardCoordinatorDowning2Spec(ClusterShardCoordinatorDowning2SpecConfig config, Type type)
: base(config, type)
{
_region = new Lazy<IActorRef>(() => ClusterSharding.Get(Sys).ShardRegion("Entity"));
}

private void StartSharding()
{
StartSharding(
Sys,
typeName: "Entity",
entityProps: Props.Create(() => new Entity()),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
}

#endregion

[MultiNodeFact]
public void Cluster_sharding_with_down_member_scenario_2_specs()
{
Cluster_sharding_with_down_member_scenario_2_must_join_cluster();
Cluster_sharding_with_down_member_scenario_2_must_initialize_shards();
Cluster_sharding_with_down_member_scenario_2_must_recover_after_downing_other_node_not_coordinator();
}

private void Cluster_sharding_with_down_member_scenario_2_must_join_cluster()
{
Within(TimeSpan.FromSeconds(20), () =>
{
StartPersistenceIfNeeded(startOn: config.First, config.First, config.Second);

Join(config.First, config.First, onJoinedRunOnFrom: StartSharding);
Join(config.Second, config.First, onJoinedRunOnFrom: StartSharding, assertNodeUp: false);

// all Up, everywhere before continuing
RunOn(() =>
{
AwaitAssert(() =>
{
Cluster.State.Members.Count.Should().Be(2);
Cluster.State.Members.Should().OnlyContain(m => m.Status == MemberStatus.Up);
});
}, config.First, config.Second);

EnterBarrier("after-2");
});
}

private void Cluster_sharding_with_down_member_scenario_2_must_initialize_shards()
{
RunOn(() =>
{
var shardLocations = Sys.ActorOf(Props.Create(() => new ShardLocations()), "shardLocations");
var locations = Enumerable.Range(1, 4).Select(n =>
{
var id = n.ToString();
_region.Value.Tell(new Ping(id));
return new KeyValuePair<string, IActorRef>(id, ExpectMsg<IActorRef>());
}).ToImmutableDictionary();
shardLocations.Tell(new Locations(locations));
Sys.Log.Debug("Original locations: [{0}]", string.Join(", ", locations.Select(i => $"{i.Key}: {i.Value}")));
}, config.First);
EnterBarrier("after-3");
}

private void Cluster_sharding_with_down_member_scenario_2_must_recover_after_downing_other_node_not_coordinator()
{
Within(TimeSpan.FromSeconds(20), () =>
{
var secondAddress = GetAddress(config.Second);

RunOn(() =>
{
TestConductor.Blackhole(config.First, config.Second, Direction.Both).Wait();
}, config.First);

Thread.Sleep(3000);

RunOn(() =>
{
Cluster.Down(GetAddress(config.Second));
AwaitAssert(() =>
{
Cluster.State.Members.Count.Should().Be(1);
});

// start a few more new shards, could be allocated to second but should notice that it's terminated
ImmutableDictionary<string, IActorRef> additionalLocations = null;
AwaitAssert(() =>
{
var probe = CreateTestProbe();
additionalLocations = Enumerable.Range(5, 4).Select(n =>
{
var id = n.ToString();
_region.Value.Tell(new Ping(id), probe.Ref);
return new KeyValuePair<string, IActorRef>(id, probe.ExpectMsg<IActorRef>(TimeSpan.FromSeconds(1)));
}).ToImmutableDictionary();
});
Sys.Log.Debug("Additional locations: [{0}]", string.Join(", ", additionalLocations.Select(i => $"{i.Key}: {i.Value}")));

Sys.ActorSelection(Node(config.First) / "user" / "shardLocations").Tell(GetLocations.Instance);
var originalLocations = ExpectMsg<Locations>().Locs;

AwaitAssert(() =>
{
var probe = CreateTestProbe();
foreach (var loc in originalLocations.SetItems(additionalLocations))
{
_region.Value.Tell(new Ping(loc.Key), probe.Ref);
if (loc.Value.Path.Address.Equals(secondAddress))
{
var newRef = probe.ExpectMsg<IActorRef>(TimeSpan.FromSeconds(1));
newRef.Should().NotBe(loc.Value);
Sys.Log.Debug("Moved [{0}] from [{1}] to [{2}]", loc.Key, loc.Value, newRef);
}
else
probe.ExpectMsg(loc.Value, TimeSpan.FromSeconds(1)); // should not move

}
});
}, config.First);
});

EnterBarrier("after-4");
}
}
}
Loading

0 comments on commit 2cbdbcb

Please sign in to comment.