Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zbynek001 sharding update2 #5857

Merged
merged 77 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
9919ddb
Add Dropped to Akka.Actor (migrated partially from https://github.com…
zbynek001 Nov 3, 2020
4762240
Logging of UnhandledMessage (migrated from https://github.com/akka/ak…
zbynek001 Nov 14, 2020
220dd0b
MessageBuffer implementations
zbynek001 Nov 11, 2020
c2d789f
TestKit logger with prefix
zbynek001 Nov 11, 2020
cc80e4b
sharding update
zbynek001 Nov 12, 2020
55b4537
sharding tests
zbynek001 Nov 12, 2020
ba28c66
sharding multinode tests
zbynek001 Nov 12, 2020
5cffe62
api approval
zbynek001 Nov 12, 2020
df19783
replace sqlite with MemoryJournalShared and local snapshot store
zbynek001 Nov 12, 2020
fb6ecc8
tests
zbynek001 Nov 13, 2020
5cb90af
snapshot inmem
zbynek001 Nov 13, 2020
ca3a3fa
backwards compatible PersistenceId for PersistentShardCoordinator
zbynek001 Nov 14, 2020
4d67fc2
test fix
zbynek001 Nov 14, 2020
2547e32
SnapshotStoreProxy & MemorySnapshotStoreShared
zbynek001 Nov 14, 2020
a0defd1
test snapshot store switched to shared inmem
zbynek001 Nov 14, 2020
02e2994
ExternalShardAllocationStrategy & tests
zbynek001 Nov 16, 2020
2bb32cb
ExternalShardAllocationStrategy API approval
zbynek001 Nov 16, 2020
aee94d3
test timing fix
zbynek001 Nov 16, 2020
f62f635
Merge branch 'dev' into sharding-update2
Aaronontheweb Dec 23, 2020
fc8d701
Merge branch 'dev' into sharding-update2
Arkatufus Dec 30, 2020
0ba18a7
review comments addressed
zbynek001 Jan 2, 2021
84a3330
IEquatable removed for singleton messages
zbynek001 Jan 3, 2021
0dca2a4
test fixes
zbynek001 Jan 3, 2021
5b4da3c
cleanup
zbynek001 Jan 3, 2021
1ab2252
test cleanup
zbynek001 Jan 4, 2021
6851a44
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Mar 8, 2021
cf60868
protobuf generated
zbynek001 Mar 8, 2021
3b30087
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Mar 10, 2021
20cc2c8
cleanup
zbynek001 Mar 10, 2021
704bfbf
Merge branch 'dev' of https://github.com/akkadotnet/akka.net into sha…
zbynek001 Apr 6, 2021
590f1e7
cleanup
zbynek001 Apr 6, 2021
5bdd442
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Jul 14, 2021
f799689
Race condition in DeprecatedSupervisionSpec fixed (migrated from http…
zbynek001 Jul 14, 2021
92e2a29
cleanup
zbynek001 Jul 14, 2021
2484e93
Small clarification of recovery strategy in config (migrated from htt…
zbynek001 Jul 14, 2021
759f8a9
Resolve snapshot check skipped for some events (migrated from https:/…
zbynek001 Jul 14, 2021
4d19943
additional sharding messages serialization, tests
zbynek001 Jul 14, 2021
cb35f11
api approval update
zbynek001 Jul 14, 2021
f84978b
disable durable storage on ShardRegionSpec
zbynek001 Jul 14, 2021
0a2f7bb
extend timeout for ExternalShardAllocationSpec
zbynek001 Jul 14, 2021
a610c06
naming conventions
zbynek001 Jul 15, 2021
3082d03
missing readonly added, updated syntax
zbynek001 Jul 15, 2021
c8575dd
renaming conventions
zbynek001 Jul 15, 2021
d01d619
Defer coordinator stop until region graceful stop has completed (migr…
zbynek001 Jul 28, 2021
e2efd10
sharding: actively signal 'region stopped' to the coordinator (migrat…
zbynek001 Jul 28, 2021
c68adab
racy test fix
zbynek001 Jul 28, 2021
fc58a9f
racy test verbose logging
zbynek001 Jul 28, 2021
e6fb13b
test update
zbynek001 Jul 28, 2021
cb4cac3
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Aug 3, 2021
be552bc
merge fix
zbynek001 Aug 3, 2021
c38268d
sharding ddata coordinator switch to ReadMajorityPlus/WriteMajorityPlus
zbynek001 Aug 3, 2021
44ccab1
more logs to debug tests
zbynek001 Aug 3, 2021
7c39c9f
more logs
zbynek001 Aug 3, 2021
9d0f515
Merge branch 'dev' into sharding-update2
Arkatufus Aug 3, 2021
c22b342
fix MultiNodeClusterSpec default timeouts
zbynek001 Aug 4, 2021
8916366
revert additional logs
zbynek001 Aug 4, 2021
b9631d6
override single-expect-default only for sharding tests
zbynek001 Aug 4, 2021
7b0e1b6
revert unrelated protobuf serializers
zbynek001 Aug 4, 2021
5b5a9a5
Merge branch 'dev' into sharding-update2
Arkatufus Aug 16, 2021
e11071e
Merge branch 'dev' into sharding-update2
Aaronontheweb Aug 18, 2021
05fd2ad
Fix StartEntitySpec instability (migrated from https://github.com/akk…
zbynek001 Aug 26, 2021
9335fb9
Quieter logging for ShardCoordinator initialization (migrated from ht…
zbynek001 Aug 26, 2021
ef73094
reduce default write-majority-plus for sharding (migrated from https:…
zbynek001 Aug 26, 2021
a6c1b8c
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Aug 26, 2021
95cd398
merge fix
zbynek001 Aug 26, 2021
fe8fc83
rebalance log fix
zbynek001 Sep 1, 2021
647346d
Merge branch 'dev' into sharding-update2
Aaronontheweb Feb 10, 2022
03abd3d
fixed compilation error from rebase
Aaronontheweb Feb 10, 2022
37af7c2
switch RememberEntitiesShardIdExtractorChangeSpec from ddata to persi…
zbynek001 Feb 10, 2022
645e3f6
disable durable storage on PersistentShardingMigrationSpec
zbynek001 Feb 10, 2022
2bfaf8d
clean up leveldb configuration
zbynek001 Feb 10, 2022
56304f9
Merge branch 'dev' into sharding-update2
Aaronontheweb Mar 4, 2022
ae4750b
Merge branch 'dev' into sharding-update2
Arkatufus Apr 5, 2022
c416295
Merge branch 'dev' into sharding-update2
Arkatufus Apr 12, 2022
4cdde2e
Merge branch 'sharding-update2' of https://github.com/zbynek001/akka.…
Aaronontheweb Apr 20, 2022
8be634f
Merge branch 'dev' into zbynek001-sharding-update2
Aaronontheweb Apr 20, 2022
6bedc7c
fix XML-DOC warnings
Aaronontheweb Apr 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using System.Runtime.Serialization;
using Akka.Event;
using Akka.Persistence.Journal;
using Akka.Persistence;
using System.Threading;
using Akka.Util.Internal;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Persistence;
using Akka.Persistence.Journal;

namespace Akka.Cluster.Sharding.Tests
{
Expand Down Expand Up @@ -86,6 +84,12 @@ public SetStore(IActorRef store)
/// </summary>
public abstract class AsyncWriteProxyEx : AsyncWriteJournal, IWithUnboundedStash
{
private class InitTimeout
{
public static readonly InitTimeout Instance = new InitTimeout();
private InitTimeout() { }
}

private bool _isInitialized;
private bool _isInitTimedOut;
private IActorRef _store;
Expand Down Expand Up @@ -278,27 +282,6 @@ private Task<T> StoreNotInitialized<T>()
/// TBD
/// </summary>
public IStash Stash { get; set; }

// sent to self only
/// <summary>
/// TBD
/// </summary>
public class InitTimeout
{
private InitTimeout() { }
private static readonly InitTimeout _instance = new InitTimeout();

/// <summary>
/// TBD
/// </summary>
public static InitTimeout Instance
{
get
{
return _instance;
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -338,7 +321,6 @@ protected override bool Receive(object message)
switch (message)
{
case ReplayedMessage rm:
//rm.Persistent
_replayCallback(rm.Persistent);
return true;
case RecoverySuccess _:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterShardCoordinatorDowning2Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using Akka.Actor;
using Akka.Remote.TestKit;
using Akka.Util;
using FluentAssertions;
using static Akka.Remote.Transport.ThrottleTransportAdapter;

namespace Akka.Cluster.Sharding.Tests
{
public class ClusterShardCoordinatorDowning2SpecConfig : MultiNodeClusterShardingConfig
{
public RoleName First { get; }
public RoleName Second { get; }

public ClusterShardCoordinatorDowning2SpecConfig(StateStoreMode mode)
: base(mode: mode, loglevel: "DEBUG", additionalConfig: @"
akka.cluster.sharding.rebalance-interval = 120 s
# setting down-removal-margin, for testing of issue #29131
akka.cluster.down-removal-margin = 3 s
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
")
{
First = Role("first");
Second = Role("second");

TestTransport = true;
}
}

public class PersistentClusterShardCoordinatorDowning2SpecConfig : ClusterShardCoordinatorDowning2SpecConfig
{
public PersistentClusterShardCoordinatorDowning2SpecConfig()
: base(StateStoreMode.Persistence)
{
}
}

public class DDataClusterShardCoordinatorDowning2SpecConfig : ClusterShardCoordinatorDowning2SpecConfig
{
public DDataClusterShardCoordinatorDowning2SpecConfig()
: base(StateStoreMode.DData)
{
}
}

public class PersistentClusterShardCoordinatorDowning2Spec : ClusterShardCoordinatorDowning2Spec
{
public PersistentClusterShardCoordinatorDowning2Spec()
: base(new PersistentClusterShardCoordinatorDowning2SpecConfig(), typeof(PersistentClusterShardCoordinatorDowning2Spec))
{
}
}

public class DDataClusterShardCoordinatorDowning2Spec : ClusterShardCoordinatorDowning2Spec
{
public DDataClusterShardCoordinatorDowning2Spec()
: base(new DDataClusterShardCoordinatorDowning2SpecConfig(), typeof(DDataClusterShardCoordinatorDowning2Spec))
{
}
}

public abstract class ClusterShardCoordinatorDowning2Spec : MultiNodeClusterShardingSpec<ClusterShardCoordinatorDowning2SpecConfig>
{
#region setup

internal sealed class Ping
{
public readonly string Id;

public Ping(string id)
{
Id = id;
}
}

internal class Entity : ActorBase
{
protected override bool Receive(object message)
{
if (message is Ping)
{
Sender.Tell(Self);
return true;
}
return false;
}
}

internal class GetLocations
{
public static readonly GetLocations Instance = new GetLocations();

private GetLocations()
{
}
}

internal class Locations
{
public Locations(ImmutableDictionary<string, IActorRef> locations)
{
Locs = locations;
}

public ImmutableDictionary<string, IActorRef> Locs { get; }
}

internal class ShardLocations : ActorBase
{
private Locations locations;

public ShardLocations()
{
}

protected override bool Receive(object message)
{
switch (message)
{
case GetLocations _:
Sender.Tell(locations);
return true;
case Locations l:
locations = l;
return true;
}
return false;
}
}

private ExtractEntityId extractEntityId = message =>
{
switch (message)
{
case Ping p:
return (p.Id, message);
}
return Option<(string, object)>.None;
};

internal ExtractShardId extractShardId = message =>
{
switch (message)
{
case Ping p:
return p.Id[0].ToString();
}
return null;
};

private readonly Lazy<IActorRef> _region;

protected ClusterShardCoordinatorDowning2Spec(ClusterShardCoordinatorDowning2SpecConfig config, Type type)
: base(config, type)
{
_region = new Lazy<IActorRef>(() => ClusterSharding.Get(Sys).ShardRegion("Entity"));
}

private void StartSharding()
{
StartSharding(
Sys,
typeName: "Entity",
entityProps: Props.Create(() => new Entity()),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
}

#endregion

[MultiNodeFact]
public void Cluster_sharding_with_down_member_scenario_2_specs()
{
Cluster_sharding_with_down_member_scenario_2_must_join_cluster();
Cluster_sharding_with_down_member_scenario_2_must_initialize_shards();
Cluster_sharding_with_down_member_scenario_2_must_recover_after_downing_other_node_not_coordinator();
}

private void Cluster_sharding_with_down_member_scenario_2_must_join_cluster()
{
Within(TimeSpan.FromSeconds(20), () =>
{
StartPersistenceIfNeeded(startOn: config.First, config.First, config.Second);

Join(config.First, config.First, onJoinedRunOnFrom: StartSharding);
Join(config.Second, config.First, onJoinedRunOnFrom: StartSharding, assertNodeUp: false);

// all Up, everywhere before continuing
RunOn(() =>
{
AwaitAssert(() =>
{
Cluster.State.Members.Count.Should().Be(2);
Cluster.State.Members.Should().OnlyContain(m => m.Status == MemberStatus.Up);
});
}, config.First, config.Second);

EnterBarrier("after-2");
});
}

private void Cluster_sharding_with_down_member_scenario_2_must_initialize_shards()
{
RunOn(() =>
{
var shardLocations = Sys.ActorOf(Props.Create(() => new ShardLocations()), "shardLocations");
var locations = Enumerable.Range(1, 4).Select(n =>
{
var id = n.ToString();
_region.Value.Tell(new Ping(id));
return new KeyValuePair<string, IActorRef>(id, ExpectMsg<IActorRef>());
}).ToImmutableDictionary();
shardLocations.Tell(new Locations(locations));
Sys.Log.Debug("Original locations: [{0}]", string.Join(", ", locations.Select(i => $"{i.Key}: {i.Value}")));
}, config.First);
EnterBarrier("after-3");
}

private void Cluster_sharding_with_down_member_scenario_2_must_recover_after_downing_other_node_not_coordinator()
{
Within(TimeSpan.FromSeconds(20), () =>
{
var secondAddress = GetAddress(config.Second);

RunOn(() =>
{
TestConductor.Blackhole(config.First, config.Second, Direction.Both).Wait();
}, config.First);

Thread.Sleep(3000);

RunOn(() =>
{
Cluster.Down(GetAddress(config.Second));
AwaitAssert(() =>
{
Cluster.State.Members.Count.Should().Be(1);
});

// start a few more new shards, could be allocated to second but should notice that it's terminated
ImmutableDictionary<string, IActorRef> additionalLocations = null;
AwaitAssert(() =>
{
var probe = CreateTestProbe();
additionalLocations = Enumerable.Range(5, 4).Select(n =>
{
var id = n.ToString();
_region.Value.Tell(new Ping(id), probe.Ref);
return new KeyValuePair<string, IActorRef>(id, probe.ExpectMsg<IActorRef>(TimeSpan.FromSeconds(1)));
}).ToImmutableDictionary();
});
Sys.Log.Debug("Additional locations: [{0}]", string.Join(", ", additionalLocations.Select(i => $"{i.Key}: {i.Value}")));

Sys.ActorSelection(Node(config.First) / "user" / "shardLocations").Tell(GetLocations.Instance);
var originalLocations = ExpectMsg<Locations>().Locs;

AwaitAssert(() =>
{
var probe = CreateTestProbe();
foreach (var loc in originalLocations.SetItems(additionalLocations))
{
_region.Value.Tell(new Ping(loc.Key), probe.Ref);
if (loc.Value.Path.Address.Equals(secondAddress))
{
var newRef = probe.ExpectMsg<IActorRef>(TimeSpan.FromSeconds(1));
newRef.Should().NotBe(loc.Value);
Sys.Log.Debug("Moved [{0}] from [{1}] to [{2}]", loc.Key, loc.Value, newRef);
}
else
probe.ExpectMsg(loc.Value, TimeSpan.FromSeconds(1)); // should not move

}
});
}, config.First);
});

EnterBarrier("after-4");
}
}
}
Loading