Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cluster] Enable HeartbeatResponse message type #6063

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace Akka.Cluster
public System.Nullable<System.TimeSpan> ShutdownAfterUnsuccessfulJoinSeedNodes { get; }
public System.TimeSpan UnreachableNodesReaperInterval { get; }
public string UseDispatcher { get; }
public bool UseLegacyHeartbeatMessage { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
Expand Down
20 changes: 18 additions & 2 deletions src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,28 @@
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class ClusterHeartBeatSenderStateSpec : ClusterSpecBase
public class ClusterHeartBeatSenderStateSpec : ClusterHeartBeatSenderStateBase
{
public ClusterHeartBeatSenderStateSpec()
public ClusterHeartBeatSenderStateSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartBeatSenderStateLegacySpec : ClusterHeartBeatSenderStateBase
{
public ClusterHeartBeatSenderStateLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartBeatSenderStateBase : ClusterSpecBase
{
protected ClusterHeartBeatSenderStateBase(ITestOutputHelper output, bool useLegacyMessage)
: base(output, useLegacyMessage)
{
_emptyState = EmptyState(aa);
}
Expand Down
25 changes: 21 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,29 @@

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatReceiverSpec : AkkaSpec
public class ClusterHeartbeatReceiverSpec : ClusterHeartbeatReceiverBase
{
public static Config Config = @"akka.actor.provider = cluster";
public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartbeatReceiverLegacySpec : ClusterHeartbeatReceiverBase
{
public ClusterHeartbeatReceiverLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartbeatReceiverBase : AkkaSpec
{
private static Config Config(bool useLegacyHeartbeat) => $@"
akka.actor.provider = cluster
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}
";

public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
: base(Config, output)
protected ClusterHeartbeatReceiverBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base(Config(useLegacyHeartbeat), output)
{

}
Expand Down
23 changes: 19 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,21 @@

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatSenderSpec : AkkaSpec
public class ClusterHeartbeatSenderSpec : ClusterHeartbeatSenderBase
{
public ClusterHeartbeatSenderSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class ClusterHeartbeatSenderLegacySpec : ClusterHeartbeatSenderBase
{
public ClusterHeartbeatSenderLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class ClusterHeartbeatSenderBase : AkkaSpec
{
class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
Expand All @@ -40,14 +54,15 @@ protected override ActorSelection HeartbeatReceiver(Address address)
}
}

public static readonly Config Config = @"
private static Config Config(bool useLegacyHeartbeat) => $@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.cluster.failure-detector.heartbeat-interval = 0.2s
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}
";

public ClusterHeartbeatSenderSpec(ITestOutputHelper output)
: base(Config, output){ }
protected ClusterHeartbeatSenderBase(ITestOutputHelper output, bool useLegacyMessage)
: base(Config(useLegacyMessage), output){ }

[Fact]
public async Task ClusterHeartBeatSender_must_increment_heartbeat_SeqNo()
Expand Down
16 changes: 10 additions & 6 deletions src/core/Akka.Cluster.Tests/ClusterSpecBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using Akka.Configuration;
using Akka.TestKit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -15,20 +16,23 @@ namespace Akka.Cluster.Tests
/// </summary>
public abstract class ClusterSpecBase : AkkaSpec
{
protected ClusterSpecBase(Config config) : base(config.WithFallback(BaseConfig))
protected ClusterSpecBase(Config config, ITestOutputHelper output, bool useLegacyHeartbeat)
: base(config.WithFallback(BaseConfig(useLegacyHeartbeat)), output)
{

}

protected ClusterSpecBase()
: base(BaseConfig)
protected ClusterSpecBase(ITestOutputHelper output, bool useLegacyHeartbeat)
: base(BaseConfig(useLegacyHeartbeat), output)
{

}

protected static readonly Config BaseConfig = ConfigurationFactory.ParseString(@"
akka.actor.serialize-messages = on
akka.actor.serialize-creators = on");
private static Config BaseConfig(bool useLegacyHeartbeat) =>
ConfigurationFactory.ParseString($@"
akka.actor.serialize-messages = on
akka.actor.serialize-creators = on
akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}");
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}
}

19 changes: 17 additions & 2 deletions src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,27 @@
using Akka.Actor;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class HeartbeatNodeRingSpec : ClusterSpecBase
public class HeartbeatNodeRingSpec : HeartbeatNodeRingBase
{
public HeartbeatNodeRingSpec()
public HeartbeatNodeRingSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class HeartbeatNodeRingLegacySpec : HeartbeatNodeRingBase
{
public HeartbeatNodeRingLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class HeartbeatNodeRingBase : ClusterSpecBase
{
protected HeartbeatNodeRingBase(ITestOutputHelper output, bool useLegacyMessage) : base(output, useLegacyMessage)
{
_nodes = ImmutableHashSet.Create(aa, bb, cc, dd, ee, ff);
}
Expand Down
22 changes: 21 additions & 1 deletion src/core/Akka.Cluster.Tests/SerializationChecksSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,33 @@
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
public class SerializationChecksSpec : ClusterSpecBase
public class SerializationChecksSpec : SerializationChecksBase
{
public SerializationChecksSpec(ITestOutputHelper output) : base(output, false)
{
}
}

public class SerializationChecksLegacySpec : SerializationChecksBase
{
public SerializationChecksLegacySpec(ITestOutputHelper output) : base(output, true)
{
}
}

public abstract class SerializationChecksBase : ClusterSpecBase
{
protected SerializationChecksBase(ITestOutputHelper output, bool useLegacyHeartbeat) : base(output, useLegacyHeartbeat)
{
}

[Fact]
public void Settings_serializemessages_and_serializecreators_must_be_on_for_tests()
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka.Cluster/ClusterSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ TimeSpan GetWeaklyUpDuration()

WeaklyUpAfter = GetWeaklyUpDuration();

UseLegacyHeartbeatMessage = clusterConfig.GetBoolean("use-legacy-heartbeat-message", true);
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand Down Expand Up @@ -300,6 +301,8 @@ TimeSpan GetWeaklyUpDuration()
/// The leader will move <see cref="MemberStatus.WeaklyUp"/> members to <see cref="MemberStatus.Up"/> status once convergence has been reached.
/// </summary>
public TimeSpan WeaklyUpAfter { get; }

public bool UseLegacyHeartbeatMessage { get; }
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}
}

2 changes: 2 additions & 0 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ akka {
# greater than this value.
reduce-gossip-different-view-probability = 400

use-legacy-heartbeat-message = true
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved

# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable
# members.
Expand Down
38 changes: 34 additions & 4 deletions src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ public class ClusterMessageSerializer : SerializerWithStringManifest
internal const string GossipEnvelopeManifest = "Akka.Cluster.GossipEnvelope, Akka.Cluster";
internal const string ClusterRouterPoolManifest = "Akka.Cluster.Routing.ClusterRouterPool, Akka.Cluster";

private Option<bool> _useLegacyHeartbeatMessageDontUseDirectly = Option<bool>.None;
private bool UseLegacyHeartbeatMessage
{
get
{
if(_useLegacyHeartbeatMessageDontUseDirectly.IsEmpty)
_useLegacyHeartbeatMessageDontUseDirectly = Cluster.Get(system).Settings.UseLegacyHeartbeatMessage;
return _useLegacyHeartbeatMessageDontUseDirectly.Value;
}
}

public ClusterMessageSerializer(ExtendedActorSystem system) : base(system)
{
Expand All @@ -67,9 +77,13 @@ public override byte[] ToBinary(object obj)
switch (obj)
{
case ClusterHeartbeatSender.Heartbeat heartbeat:
return AddressToProto(heartbeat.From).ToByteArray();
return UseLegacyHeartbeatMessage
? AddressToProto(heartbeat.From).ToByteArray()
: HeartbeatToProto(heartbeat).ToByteArray();
case ClusterHeartbeatSender.HeartbeatRsp heartbeatRsp:
return UniqueAddressToProto(heartbeatRsp.From).ToByteArray();
return UseLegacyHeartbeatMessage
? UniqueAddressToProto(heartbeatRsp.From).ToByteArray()
: HeartbeatRspToProto(heartbeatRsp).ToByteArray();
case GossipEnvelope gossipEnvelope:
return GossipEnvelopeToProto(gossipEnvelope);
case GossipStatus gossipStatus:
Expand Down Expand Up @@ -159,9 +173,9 @@ public override string Manifest(object o)
case InternalClusterAction.InitJoinNack _:
return InitJoinNackManifest;
case ClusterHeartbeatSender.Heartbeat _:
return HeartBeatManifestPre1419;
return UseLegacyHeartbeatMessage ? HeartBeatManifestPre1419 : HeartBeatManifest;
case ClusterHeartbeatSender.HeartbeatRsp _:
return HeartBeatRspManifestPre1419;
return UseLegacyHeartbeatMessage ? HeartBeatRspManifestPre1419 : HeartBeatRspManifest;
case InternalClusterAction.ExitingConfirmed _:
return ExitingConfirmedManifest;
case GossipStatus _:
Expand Down Expand Up @@ -460,6 +474,14 @@ private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRspAsUniq
return new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
}

private static Proto.Msg.HeartBeatResponse HeartbeatRspToProto(ClusterHeartbeatSender.HeartbeatRsp heartbeatRsp)
=> new HeartBeatResponse
{
From = UniqueAddressToProto(heartbeatRsp.From),
CreationTime = heartbeatRsp.CreationTimeNanos,
SequenceNr = heartbeatRsp.SequenceNr
};

private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRsp(byte[] bytes)
{
var hbsp = HeartBeatResponse.Parser.ParseFrom(bytes);
Expand All @@ -471,6 +493,14 @@ private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeatAsAddress(by
return new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes)), -1, -1);
}

private static Proto.Msg.Heartbeat HeartbeatToProto(ClusterHeartbeatSender.Heartbeat heartbeat)
=> new Heartbeat
{
From = AddressToProto(heartbeat.From),
CreationTime = heartbeat.CreationTimeNanos,
SequenceNr = heartbeat.SequenceNr
};

private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeat(byte[] bytes)
{
var hb = Heartbeat.Parser.ParseFrom(bytes);
Expand Down