Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cluster] Enable HeartbeatResponse message type #6063

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
20 changes: 18 additions & 2 deletions src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,28 @@
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class ClusterHeartBeatSenderStateSpec : ClusterSpecBase
public class ClusterHeartBeatSenderStateSpec : ClusterHeartBeatSenderStateBase
{
public ClusterHeartBeatSenderStateSpec()
public ClusterHeartBeatSenderStateSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartBeatSenderStateLegacySpec : ClusterHeartBeatSenderStateBase
{
public ClusterHeartBeatSenderStateLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartBeatSenderStateBase : ClusterSpecBase
{
protected ClusterHeartBeatSenderStateBase(ITestOutputHelper output, bool useLegacyMessage)
: base(output, useLegacyMessage)
{
_emptyState = EmptyState(aa);
}
Expand Down
25 changes: 21 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,29 @@

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatReceiverSpec : AkkaSpec
public class ClusterHeartbeatReceiverSpec : ClusterHeartbeatReceiverBase
{
public static Config Config = @"akka.actor.provider = cluster";
public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartbeatReceiverLegacySpec : ClusterHeartbeatReceiverBase
{
public ClusterHeartbeatReceiverLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartbeatReceiverBase : AkkaSpec
{
private static Config Config(bool useLegacyHeartbeat) => $@"
akka.actor.provider = cluster
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}
";

public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
: base(Config, output)
protected ClusterHeartbeatReceiverBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base(Config(useLegacyHeartbeat), output)
{

}
Expand Down
23 changes: 19 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,21 @@

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatSenderSpec : AkkaSpec
public class ClusterHeartbeatSenderSpec : ClusterHeartbeatSenderBase
{
public ClusterHeartbeatSenderSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartbeatSenderLegacySpec : ClusterHeartbeatSenderBase
{
public ClusterHeartbeatSenderLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartbeatSenderBase : AkkaSpec
{
class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
Expand All @@ -40,14 +54,15 @@ protected override ActorSelection HeartbeatReceiver(Address address)
}
}

public static readonly Config Config = @"
private static Config Config(bool useLegacyHeartbeat) => $@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.cluster.failure-detector.heartbeat-interval = 0.2s
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}
";

public ClusterHeartbeatSenderSpec(ITestOutputHelper output)
: base(Config, output){ }
protected ClusterHeartbeatSenderBase(ITestOutputHelper output, bool useLegacyMessage)
: base(Config(useLegacyMessage), output){ }

[Fact]
public async Task ClusterHeartBeatSender_must_increment_heartbeat_SeqNo()
Expand Down
16 changes: 10 additions & 6 deletions src/core/Akka.Cluster.Tests/ClusterSpecBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using Akka.Configuration;
using Akka.TestKit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -15,20 +16,23 @@ namespace Akka.Cluster.Tests
/// </summary>
public abstract class ClusterSpecBase : AkkaSpec
{
protected ClusterSpecBase(Config config) : base(config.WithFallback(BaseConfig))
protected ClusterSpecBase(Config config, ITestOutputHelper output, bool useLegacyHeartbeat)
: base(config.WithFallback(BaseConfig(useLegacyHeartbeat)), output)
{

}

protected ClusterSpecBase()
: base(BaseConfig)
protected ClusterSpecBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base(BaseConfig(useLegacyHeartbeat), output)
{

}

protected static readonly Config BaseConfig = ConfigurationFactory.ParseString(@"
akka.actor.serialize-messages = on
akka.actor.serialize-creators = on");
private static Config BaseConfig(bool useLegacyHeartbeat) =>
ConfigurationFactory.ParseString($@"
akka.actor.serialize-messages = on
akka.actor.serialize-creators = on
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}");
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}
}

19 changes: 17 additions & 2 deletions src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,27 @@
using Akka.Actor;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class HeartbeatNodeRingSpec : ClusterSpecBase
public class HeartbeatNodeRingSpec : HeartbeatNodeRingBase
{
public HeartbeatNodeRingSpec()
public HeartbeatNodeRingSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class HeartbeatNodeRingLegacySpec : HeartbeatNodeRingBase
{
public HeartbeatNodeRingLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class HeartbeatNodeRingBase : ClusterSpecBase
{
protected HeartbeatNodeRingBase(ITestOutputHelper output, bool useLegacyMessage) : base(output, useLegacyMessage)
{
_nodes = ImmutableHashSet.Create(aa, bb, cc, dd, ee, ff);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,32 @@
using Xunit;
using FluentAssertions;
using Akka.Util;
using Akka.Util.Internal;
using Google.Protobuf;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests.Serialization
{
public class ClusterMessageSerializerSpec : AkkaSpec
public class ClusterMessageSerializerSpec: ClusterMessageSerializerBase
{
public ClusterMessageSerializerSpec()
: base(@"akka.actor.provider = cluster")
public ClusterMessageSerializerSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterMessageSerializerLegacySpec: ClusterMessageSerializerBase
{
public ClusterMessageSerializerLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterMessageSerializerBase : AkkaSpec
{
public ClusterMessageSerializerBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base($@"
akka.actor.provider = cluster
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}", output)
{
}

Expand All @@ -40,20 +58,6 @@ public void Can_serialize_Heartbeat()
AssertEqual(message);
}

[Fact]
public void Can_serialize_Hearbeatv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest);
}

[Fact]
public void Can_serialize_HeartbeatRsp()
{
Expand All @@ -63,20 +67,6 @@ public void Can_serialize_HeartbeatRsp()
AssertEqual(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only succeeds because -1,, -1 are the hard-coded values when legacy==on. Need to harden this to make sure that the correct, non--1 values are supported when legacy==off.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AssertEqual regardless of config is what's not going to work here. Change

 var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);

to

 var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, 10, 3);

When legacy off, 10,3

When legacy on, -1, -1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

[Fact]
public void Can_serialize_HearbeatRspv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest);
}

[Fact]
public void Can_serialize_GossipEnvelope()
{
Expand Down Expand Up @@ -220,10 +210,12 @@ public void Can_serialize_ClusterRouterPoolWithEmptyRole()

private T AssertAndReturn<T>(T message)
{
var serializer = Sys.Serialization.FindSerializerFor(message);
var serialized = serializer.ToBinary(message);
var serializer = (SerializerWithStringManifest) Sys.Serialization.FindSerializerFor(message);
serializer.Should().BeOfType<ClusterMessageSerializer>();
return serializer.FromBinary<T>(serialized);

var serialized = serializer.ToBinary(message);
var manifest = serializer.Manifest(message);
return (T) serializer.FromBinary(serialized, manifest);
}

private void AssertEqual<T>(T message)
Expand Down
22 changes: 21 additions & 1 deletion src/core/Akka.Cluster.Tests/SerializationChecksSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,33 @@
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class SerializationChecksSpec : ClusterSpecBase
public class SerializationChecksSpec : SerializationChecksBase
{
public SerializationChecksSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class SerializationChecksLegacySpec : SerializationChecksBase
{
public SerializationChecksLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class SerializationChecksBase : ClusterSpecBase
{
protected SerializationChecksBase(ITestOutputHelper output, bool useLegacyHeartbeat) : base(output, useLegacyHeartbeat)
{
}

[Fact]
public void Settings_serializemessages_and_serializecreators_must_be_on_for_tests()
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka.Cluster/ClusterSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ TimeSpan GetWeaklyUpDuration()

WeaklyUpAfter = GetWeaklyUpDuration();

UseLegacyHeartbeatMessage = clusterConfig.GetBoolean("use-legacy-heartbeat-message", false);
}

/// <summary>
Expand Down Expand Up @@ -300,6 +301,8 @@ TimeSpan GetWeaklyUpDuration()
/// The leader will move <see cref="MemberStatus.WeaklyUp"/> members to <see cref="MemberStatus.Up"/> status once convergence has been reached.
/// </summary>
public TimeSpan WeaklyUpAfter { get; }

public bool UseLegacyHeartbeatMessage { get; }
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}
}

2 changes: 2 additions & 0 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ akka {
# greater than this value.
reduce-gossip-different-view-probability = 400

use-legacy-heartbeat-message = false
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved

# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable
# members.
Expand Down
Loading