Skip to content

Commit

Permalink
fixed WeaklyUp specs
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Sep 15, 2017
1 parent d8f0efe commit 3d960bf
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 155 deletions.
40 changes: 23 additions & 17 deletions src/core/Akka.Cluster.Tests.MultiNode/MemberWeaklyUpSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,41 @@ namespace Akka.Cluster.Tests.MultiNode
{
public class MemberWeaklyUpConfig : MultiNodeConfig
{
public RoleName First { get; } = new RoleName("first");
public RoleName Second { get; } = new RoleName("second");
public RoleName Third { get; } = new RoleName("third");
public RoleName Fourth { get; } = new RoleName("fourth");
public RoleName Fifth { get; } = new RoleName("fifth");
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }
public RoleName Fourth { get; }
public RoleName Fifth { get; }

public MemberWeaklyUpConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");
Fourth = Role("fourth");
Fifth = Role("fifth");

CommonConfig = DebugConfig(on: false)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.remote.retry-gate-closed-for = 3 s
akka.remote.retry-gate-closed-for = 3s
akka.cluster.allow-weakly-up-members = on"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

TestTransport = true;
}
}
public class MemberWeaklyUpMultiNode1 : MemberWeaklyUpSpec { }
public class MemberWeaklyUpMultiNode2 : MemberWeaklyUpSpec { }
public class MemberWeaklyUpMultiNode3 : MemberWeaklyUpSpec { }
public class MemberWeaklyUpMultiNode4 : MemberWeaklyUpSpec { }
public class MemberWeaklyUpMultiNode5 : MemberWeaklyUpSpec { }

public abstract class MemberWeaklyUpSpec : MultiNodeClusterSpec
public class MemberWeaklyUpSpec : MultiNodeClusterSpec
{
private readonly MemberWeaklyUpConfig _config;
private readonly ImmutableArray<RoleName> _side1;
private readonly ImmutableArray<RoleName> _side2;

protected MemberWeaklyUpSpec() : this(new MemberWeaklyUpConfig())
public MemberWeaklyUpSpec() : this(new MemberWeaklyUpConfig())
{
}

protected MemberWeaklyUpSpec(MemberWeaklyUpConfig config) : base(config, typeof(MemberWeaklyUpSpec))
private MemberWeaklyUpSpec(MemberWeaklyUpConfig config) : base(config, typeof(MemberWeaklyUpSpec))
{
_config = config;
_side1 = ImmutableArray.CreateRange(new[] { config.First, config.Second });
Expand Down Expand Up @@ -139,9 +140,14 @@ public void A_cluster_of_3_members_should_change_status_to_Up_after_healed_netwo
{
Within(TimeSpan.FromSeconds(20), () =>
{
foreach (var role1 in _side1)
foreach (var role2 in _side2)
TestConductor.PassThrough(role1, role2, ThrottleTransportAdapter.Direction.Both).Wait(TimeSpan.FromSeconds(3));
RunOn(() =>
{
foreach (var role1 in _side1)
foreach (var role2 in _side2)
{
TestConductor.PassThrough(role1, role2, ThrottleTransportAdapter.Direction.Both).Wait(TimeSpan.FromSeconds(3));
}
}, _config.First);

EnterBarrier("after-passThrough");

Expand Down
11 changes: 10 additions & 1 deletion src/core/Akka.Remote.TestKit/CommandLine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Specialized;
using Akka.Configuration;

namespace Akka.Remote.TestKit
{
Expand All @@ -33,7 +34,15 @@ public class CommandLine
{
if (!arg.StartsWith("-D")) continue;
var tokens = arg.Substring(2).Split('=');
dictionary.Add(tokens[0], tokens[1]);

if (tokens.Length == 2)
{
dictionary.Add(tokens[0], tokens[1]);
}
else
{
throw new ConfigurationException($"Command line parameter '{arg}' should follow the pattern [-Dmultinode.<key>=<value>].");
}
}
return dictionary;
});
Expand Down
184 changes: 80 additions & 104 deletions src/core/Akka.Remote.TestKit/Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,80 +146,60 @@ internal interface IHaveNodeInfo
NodeInfo Node { get; }
}

internal sealed class NodeInfo
internal sealed class NodeInfo : IEquatable<NodeInfo>
{
readonly RoleName _name;
readonly Address _addr;
readonly IActorRef _fsm;

public NodeInfo(RoleName name, Address addr, IActorRef fsm)
{
_name = name;
_addr = addr;
_fsm = fsm;
Name = name;
Addr = addr;
FSM = fsm;
}

public RoleName Name
{
get { return _name; }
}
public RoleName Name { get; }

public Address Addr
{
get { return _addr; }
}
public Address Addr { get; }

public IActorRef FSM
{
get { return _fsm; }
}
public IActorRef FSM { get; }

bool Equals(NodeInfo other)
public bool Equals(NodeInfo other)
{
return Equals(_name, other._name) && Equals(_addr, other._addr) && Equals(_fsm, other._fsm);
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Equals(Name, other.Name) && Equals(Addr, other.Addr) && Equals(FSM, other.FSM);
}

/// <inheritdoc/>
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
return obj is NodeInfo && Equals((NodeInfo) obj);
}
public override bool Equals(object obj) => obj is NodeInfo node && Equals(node);

/// <inheritdoc/>
public override int GetHashCode()
{
unchecked
{
int hashCode = (_name != null ? _name.GetHashCode() : 0);
hashCode = (hashCode*397) ^ (_addr != null ? _addr.GetHashCode() : 0);
hashCode = (hashCode*397) ^ (_fsm != null ? _fsm.GetHashCode() : 0);
int hashCode = (Name != null ? Name.GetHashCode() : 0);
hashCode = (hashCode*397) ^ (Addr != null ? Addr.GetHashCode() : 0);
hashCode = (hashCode*397) ^ (FSM != null ? FSM.GetHashCode() : 0);
return hashCode;
}
}

public override string ToString() => $"NodeInfo({Name}, {Addr})";

/// <summary>
/// Compares two specified <see cref="NodeInfo"/> for equality.
/// </summary>
/// <param name="left">The first <see cref="NodeInfo"/> used for comparison</param>
/// <param name="right">The second <see cref="NodeInfo"/> used for comparison</param>
/// <returns><c>true</c> if both <see cref="NodeInfo"/> are equal; otherwise <c>false</c></returns>
public static bool operator ==(NodeInfo left, NodeInfo right)
{
return Equals(left, right);
}
public static bool operator ==(NodeInfo left, NodeInfo right) => Equals(left, right);

/// <summary>
/// Compares two specified <see cref="NodeInfo"/> for inequality.
/// </summary>
/// <param name="left">The first <see cref="NodeInfo"/> used for comparison</param>
/// <param name="right">The second <see cref="NodeInfo"/> used for comparison</param>
/// <returns><c>true</c> if both <see cref="NodeInfo"/> are not equal; otherwise <c>false</c></returns>
public static bool operator !=(NodeInfo left, NodeInfo right)
{
return !Equals(left, right);
}
public static bool operator !=(NodeInfo left, NodeInfo right) => !Equals(left, right);
}

public sealed class CreateServerFSM : INoSerializationVerificationNeeded
Expand Down Expand Up @@ -270,23 +250,23 @@ protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(e =>
{
var barrierTimeout = e as BarrierCoordinator.BarrierTimeoutException;
if (barrierTimeout != null) return FailBarrier(barrierTimeout.BarrierData);
var failedBarrier = e as BarrierCoordinator.FailedBarrierException;
if (failedBarrier != null) return FailBarrier(failedBarrier.BarrierData);
var barrierEmpty = e as BarrierCoordinator.BarrierEmptyException;
if(barrierEmpty != null) return Directive.Resume;
var wrongBarrier = e as BarrierCoordinator.WrongBarrierException;
if (wrongBarrier != null)
switch (e)
{
wrongBarrier.Client.Tell(new ToClient<BarrierResult>(new BarrierResult(wrongBarrier.Barrier, false)));
return FailBarrier(wrongBarrier.BarrierData);
case BarrierCoordinator.BarrierTimeoutException barrierTimeout:
return FailBarrier(barrierTimeout.BarrierData);
case BarrierCoordinator.FailedBarrierException failedBarrier:
return FailBarrier(failedBarrier.BarrierData);
case BarrierCoordinator.BarrierEmptyException barrierEmpty:
return Directive.Resume;
case BarrierCoordinator.WrongBarrierException wrongBarrier:
wrongBarrier.Client.Tell(new ToClient<BarrierResult>(new BarrierResult(wrongBarrier.Barrier, false)));
return FailBarrier(wrongBarrier.BarrierData);
case BarrierCoordinator.ClientLostException clientLost:
return FailBarrier(clientLost.BarrierData);
case BarrierCoordinator.DuplicateNodeException duplicateNode:
return FailBarrier(duplicateNode.BarrierData);
default: throw new InvalidOperationException($"Cannot process exception of type {e.GetType()}");
}
var clientLost = e as BarrierCoordinator.ClientLostException;
if (clientLost != null) return FailBarrier(clientLost.BarrierData);
var duplicateNode = e as BarrierCoordinator.DuplicateNodeException;
if (duplicateNode != null) return FailBarrier(duplicateNode.BarrierData);
throw new InvalidOperationException($"Cannot process exception of type {e.GetType()}");
});
}

Expand Down Expand Up @@ -351,63 +331,59 @@ protected override void OnReceive(object message)
}
if (message is IServerOp)
{
if (message is EnterBarrier)
{
_barrier.Forward(message);
return;
}
if (message is FailBarrier)
switch (message)
{
_barrier.Forward(message);
return;
}
var getAddress = message as GetAddress;
if (getAddress != null)
{
var node = getAddress.Node;
if (_nodes.TryGetValue(node, out var replyNodeInfo))
Sender.Tell(new ToClient<AddressReply>(new AddressReply(node, replyNodeInfo.Addr)));
else
{
_addrInterest = _addrInterest.SetItem(node,
(_addrInterest.TryGetValue(node, out var existing)
? existing
: ImmutableHashSet.Create<IActorRef>()
case EnterBarrier _: _barrier.Forward(message); return;
case FailBarrier _: _barrier.Forward(message); return;
case GetAddress getAddress:
var node = getAddress.Node;
if (_nodes.TryGetValue(node, out var replyNodeInfo))
Sender.Tell(new ToClient<AddressReply>(new AddressReply(node, replyNodeInfo.Addr)));
else
{
_addrInterest = _addrInterest.SetItem(node,
(_addrInterest.TryGetValue(node, out var existing)
? existing
: ImmutableHashSet.Create<IActorRef>()
).Add(Sender));
}
return;
}
return;
case Done _: return; //FIXME what should happen?
}
if (message is Done) return; //FIXME what should happen?
}
if (message is ICommandOp)
{
var throttle = message as Throttle;
if (throttle != null)
{
var t = _nodes[throttle.Target];
_nodes[throttle.Node].FSM.Forward(new ToClient<ThrottleMsg>(new ThrottleMsg(t.Addr, throttle.Direction, throttle.RateMBit)));
return;
}
var disconnect = message as Disconnect;
if (disconnect != null)
{
var t = _nodes[disconnect.Target];
_nodes[disconnect.Node].FSM.Forward((new ToClient<DisconnectMsg>(new DisconnectMsg(t.Addr, disconnect.Abort))));
return;
}
var terminate = message as Terminate;
if (terminate != null)
switch (message)
{
_barrier.Tell(new BarrierCoordinator.RemoveClient(terminate.Node));
_nodes[terminate.Node].FSM.Forward(new ToClient<TerminateMsg>(new TerminateMsg(terminate.ShutdownOrExit)));
_nodes = _nodes.Remove(terminate.Node);
return;
}
var remove = message as Remove;
if (remove != null)
{
_barrier.Tell(new BarrierCoordinator.RemoveClient(remove.Node));
return;
case Throttle throttle:
{
if (!_nodes.TryGetValue(throttle.Target, out var target)) throw new IllegalActorStateException($"Throttle target {throttle.Target} was not found among nodes registered in {nameof(Controller)}: {string.Join(", ", _nodes.Keys)}");
if (!_nodes.TryGetValue(throttle.Node, out var source)) throw new IllegalActorStateException($"Throttle source {throttle.Node} was not found among nodes registered in {nameof(Controller)}: {string.Join(", ", _nodes.Keys)}");

source.FSM.Forward(new ToClient<ThrottleMsg>(new ThrottleMsg(target.Addr, throttle.Direction, throttle.RateMBit)));
return;
}
case Disconnect disconnect:
{
if (!_nodes.TryGetValue(disconnect.Target, out var target)) throw new IllegalActorStateException($"Disconnect target {disconnect.Target} was not found among nodes registered in {nameof(Controller)}: {string.Join(", ", _nodes.Keys)}");
if (!_nodes.TryGetValue(disconnect.Node, out var source)) throw new IllegalActorStateException($"Disconnect source {disconnect.Node} was not found among nodes registered in {nameof(Controller)}: {string.Join(", ", _nodes.Keys)}");

source.FSM.Forward((new ToClient<DisconnectMsg>(new DisconnectMsg(target.Addr, disconnect.Abort))));
return;
}
case Terminate terminate:
{
_barrier.Tell(new BarrierCoordinator.RemoveClient(terminate.Node));

if (!_nodes.TryGetValue(terminate.Node, out var node)) throw new IllegalActorStateException($"Terminate target {terminate.Node} was not found among nodes registered in {nameof(Controller)}: {string.Join(", ", _nodes.Keys)}");

node.FSM.Forward(new ToClient<TerminateMsg>(new TerminateMsg(terminate.ShutdownOrExit)));
_nodes = _nodes.Remove(terminate.Node);
return;
}
case Remove remove:
_barrier.Tell(new BarrierCoordinator.RemoveClient(remove.Node));
return;
}
}
if (message is GetNodes)
Expand Down
Loading

0 comments on commit 3d960bf

Please sign in to comment.