diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt index 4e1ea162b01..be4c326589f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt @@ -221,6 +221,7 @@ namespace Akka.Cluster public System.Nullable ShutdownAfterUnsuccessfulJoinSeedNodes { get; } public System.TimeSpan UnreachableNodesReaperInterval { get; } public string UseDispatcher { get; } + public bool UseLegacyHeartbeatMessage { get; } public bool VerboseGossipReceivedLogging { get; } public bool VerboseHeartbeatLogging { get; } public System.TimeSpan WeaklyUpAfter { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt index 6b738c2f7d5..435b9d20d83 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt @@ -221,6 +221,7 @@ namespace Akka.Cluster public System.Nullable ShutdownAfterUnsuccessfulJoinSeedNodes { get; } public System.TimeSpan UnreachableNodesReaperInterval { get; } public string UseDispatcher { get; } + public bool UseLegacyHeartbeatMessage { get; } public bool VerboseGossipReceivedLogging { get; } public bool VerboseHeartbeatLogging { get; } public System.TimeSpan WeaklyUpAfter { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt index 4e1ea162b01..be4c326589f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt @@ -221,6 +221,7 @@ namespace Akka.Cluster public System.Nullable ShutdownAfterUnsuccessfulJoinSeedNodes { get; } public System.TimeSpan UnreachableNodesReaperInterval { get; } public string UseDispatcher { get; } + public bool UseLegacyHeartbeatMessage { get; } public bool VerboseGossipReceivedLogging { get; } public bool VerboseHeartbeatLogging { get; } public System.TimeSpan WeaklyUpAfter { get; } diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs index f5303bf65c3..978126d1702 100644 --- a/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs @@ -17,12 +17,28 @@ using Akka.Util.Internal; using Xunit; using FluentAssertions; +using Xunit.Abstractions; namespace Akka.Cluster.Tests { - public class ClusterHeartBeatSenderStateSpec : ClusterSpecBase + public class ClusterHeartBeatSenderStateSpec : ClusterHeartBeatSenderStateBase { - public ClusterHeartBeatSenderStateSpec() + public ClusterHeartBeatSenderStateSpec(ITestOutputHelper output) : base(output, false) + { + } + } + + public class ClusterHeartBeatSenderStateLegacySpec : ClusterHeartBeatSenderStateBase + { + public ClusterHeartBeatSenderStateLegacySpec(ITestOutputHelper output) : base(output, true) + { + } + } + + public abstract class ClusterHeartBeatSenderStateBase : ClusterSpecBase + { + protected ClusterHeartBeatSenderStateBase(ITestOutputHelper output, bool useLegacyMessage) + : base(output, useLegacyMessage) { _emptyState = EmptyState(aa); } diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs index 903af0598cb..8ac11fa8dfc 100644 --- a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs @@ -15,12 +15,29 @@ namespace Akka.Cluster.Tests { - public class ClusterHeartbeatReceiverSpec : AkkaSpec + public class ClusterHeartbeatReceiverSpec : ClusterHeartbeatReceiverBase { - public static Config Config = @"akka.actor.provider = cluster"; + public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) : base(output, false) + { + } + } + + public class ClusterHeartbeatReceiverLegacySpec : ClusterHeartbeatReceiverBase + { + public ClusterHeartbeatReceiverLegacySpec(ITestOutputHelper output) : base(output, true) + { + } + } + + public abstract class ClusterHeartbeatReceiverBase : AkkaSpec + { + private static Config Config(bool useLegacyHeartbeat) => $@" +akka.actor.provider = cluster +akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")} +"; - public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) - : base(Config, output) + protected ClusterHeartbeatReceiverBase(ITestOutputHelper output, bool useLegacyHeartbeat) + : base(Config(useLegacyHeartbeat), output) { } diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs index 81aa29db82c..e1ea86a5133 100644 --- a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs @@ -18,7 +18,21 @@ namespace Akka.Cluster.Tests { - public class ClusterHeartbeatSenderSpec : AkkaSpec + public class ClusterHeartbeatSenderSpec : ClusterHeartbeatSenderBase + { + public ClusterHeartbeatSenderSpec(ITestOutputHelper output) : base(output, false) + { + } + } + + public class ClusterHeartbeatSenderLegacySpec : ClusterHeartbeatSenderBase + { + public ClusterHeartbeatSenderLegacySpec(ITestOutputHelper output) : base(output, true) + { + } + } + + public abstract class ClusterHeartbeatSenderBase : AkkaSpec { class TestClusterHeartbeatSender : ClusterHeartbeatSender { @@ -40,14 +54,15 @@ protected override ActorSelection HeartbeatReceiver(Address address) } } - public static readonly Config Config = @" + private static Config Config(bool useLegacyHeartbeat) => $@" akka.loglevel = DEBUG akka.actor.provider = cluster akka.cluster.failure-detector.heartbeat-interval = 0.2s + akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")} "; - public ClusterHeartbeatSenderSpec(ITestOutputHelper output) - : base(Config, output){ } + protected ClusterHeartbeatSenderBase(ITestOutputHelper output, bool useLegacyMessage) + : base(Config(useLegacyMessage), output){ } [Fact] public async Task ClusterHeartBeatSender_must_increment_heartbeat_SeqNo() diff --git a/src/core/Akka.Cluster.Tests/ClusterSpecBase.cs b/src/core/Akka.Cluster.Tests/ClusterSpecBase.cs index e0006917e65..569c8602885 100644 --- a/src/core/Akka.Cluster.Tests/ClusterSpecBase.cs +++ b/src/core/Akka.Cluster.Tests/ClusterSpecBase.cs @@ -7,6 +7,7 @@ using Akka.Configuration; using Akka.TestKit; +using Xunit.Abstractions; namespace Akka.Cluster.Tests { @@ -15,20 +16,23 @@ namespace Akka.Cluster.Tests /// public abstract class ClusterSpecBase : AkkaSpec { - protected ClusterSpecBase(Config config) : base(config.WithFallback(BaseConfig)) + protected ClusterSpecBase(Config config, ITestOutputHelper output, bool useLegacyHeartbeat) + : base(config.WithFallback(BaseConfig(useLegacyHeartbeat)), output) { } - protected ClusterSpecBase() - : base(BaseConfig) + protected ClusterSpecBase(ITestOutputHelper output, bool useLegacyHeartbeat) + : base(BaseConfig(useLegacyHeartbeat), output) { } - protected static readonly Config BaseConfig = ConfigurationFactory.ParseString(@" - akka.actor.serialize-messages = on - akka.actor.serialize-creators = on"); + private static Config BaseConfig(bool useLegacyHeartbeat) => + ConfigurationFactory.ParseString($@" + akka.actor.serialize-messages = on + akka.actor.serialize-creators = on + akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}"); } } diff --git a/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs b/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs index bed553e0823..d76fe23b1c0 100644 --- a/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs +++ b/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs @@ -9,12 +9,27 @@ using Akka.Actor; using Xunit; using FluentAssertions; +using Xunit.Abstractions; namespace Akka.Cluster.Tests { - public class HeartbeatNodeRingSpec : ClusterSpecBase + public class HeartbeatNodeRingSpec : HeartbeatNodeRingBase { - public HeartbeatNodeRingSpec() + public HeartbeatNodeRingSpec(ITestOutputHelper output) : base(output, false) + { + } + } + + public class HeartbeatNodeRingLegacySpec : HeartbeatNodeRingBase + { + public HeartbeatNodeRingLegacySpec(ITestOutputHelper output) : base(output, true) + { + } + } + + public abstract class HeartbeatNodeRingBase : ClusterSpecBase + { + protected HeartbeatNodeRingBase(ITestOutputHelper output, bool useLegacyMessage) : base(output, useLegacyMessage) { _nodes = ImmutableHashSet.Create(aa, bb, cc, dd, ee, ff); } diff --git a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs index 19dc355ff8d..6a19a111fa3 100644 --- a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs +++ b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs @@ -15,16 +15,36 @@ using Xunit; using FluentAssertions; using Akka.Util; +using Akka.Util.Internal; using Google.Protobuf; +using Xunit.Abstractions; namespace Akka.Cluster.Tests.Serialization { - public class ClusterMessageSerializerSpec : AkkaSpec + public class ClusterMessageSerializerSpec: ClusterMessageSerializerBase { - public ClusterMessageSerializerSpec() - : base(@"akka.actor.provider = cluster") + public ClusterMessageSerializerSpec(ITestOutputHelper output) : base(output, false) { } + } + + public class ClusterMessageSerializerLegacySpec: ClusterMessageSerializerBase + { + public ClusterMessageSerializerLegacySpec(ITestOutputHelper output) : base(output, true) + { + } + } + + public abstract class ClusterMessageSerializerBase : AkkaSpec + { + private readonly bool _useLegacyHeartbeat; + public ClusterMessageSerializerBase(ITestOutputHelper output, bool useLegacyHeartbeat) + : base($@" +akka.actor.provider = cluster +akka.cluster.use-legacy-heartbeat-message = {(useLegacyHeartbeat ? "true" : "false")}", output) + { + _useLegacyHeartbeat = useLegacyHeartbeat; + } private static readonly Member a1 = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Joining, appVersion: AppVersion.Create("1.0.0")); private static readonly Member b1 = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Up, ImmutableHashSet.Create("r1"), appVersion: AppVersion.Create("1.1.0")); @@ -36,22 +56,11 @@ public ClusterMessageSerializerSpec() public void Can_serialize_Heartbeat() { var address = new Address("akka.tcp", "system", "some.host.org", 4711); - var message = new ClusterHeartbeatSender.Heartbeat(address, -1, -1); - AssertEqual(message); - } - - [Fact] - public void Can_serialize_Hearbeatv1419_later() - { - var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat() - { - From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address), - CreationTime = 2, - SequenceNr = 1 - }.ToByteArray(); - - var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat)); - serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest); + var legacyMessage = new ClusterHeartbeatSender.Heartbeat(address, -1, -1); + var message = new ClusterHeartbeatSender.Heartbeat(address, 10, 3); + + // Legacy heartbeat serializer will replace the sequence number and creation date with -1 and -1 respectively + AssertEqual(message, _useLegacyHeartbeat ? legacyMessage : message); } [Fact] @@ -59,22 +68,11 @@ public void Can_serialize_HeartbeatRsp() { var address = new Address("akka.tcp", "system", "some.host.org", 4711); var uniqueAddress = new UniqueAddress(address, 17); - var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1); - AssertEqual(message); - } - - [Fact] - public void Can_serialize_HearbeatRspv1419_later() - { - var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse() - { - From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress), - CreationTime = 2, - SequenceNr = 1 - }.ToByteArray(); - - var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat)); - serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest); + var legacyMessage = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1); + var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, 10, 3); + + // Legacy heartbeat serializer will replace the sequence number and creation date with -1 and -1 respectively + AssertEqual(message, _useLegacyHeartbeat ? legacyMessage : message); } [Fact] @@ -220,16 +218,18 @@ public void Can_serialize_ClusterRouterPoolWithEmptyRole() private T AssertAndReturn(T message) { - var serializer = Sys.Serialization.FindSerializerFor(message); - var serialized = serializer.ToBinary(message); + var serializer = (SerializerWithStringManifest) Sys.Serialization.FindSerializerFor(message); serializer.Should().BeOfType(); - return serializer.FromBinary(serialized); + + var serialized = serializer.ToBinary(message); + var manifest = serializer.Manifest(message); + return (T) serializer.FromBinary(serialized, manifest); } - private void AssertEqual(T message) + private void AssertEqual(T message, T newMessage = null) where T : class { var deserialized = AssertAndReturn(message); - Assert.Equal(message, deserialized); + Assert.Equal(newMessage ?? message, deserialized); } } } diff --git a/src/core/Akka.Cluster.Tests/SerializationChecksSpec.cs b/src/core/Akka.Cluster.Tests/SerializationChecksSpec.cs index 234ca8b78bb..e4d9bfaa17c 100644 --- a/src/core/Akka.Cluster.Tests/SerializationChecksSpec.cs +++ b/src/core/Akka.Cluster.Tests/SerializationChecksSpec.cs @@ -5,13 +5,33 @@ // //----------------------------------------------------------------------- +using Akka.Configuration; using Akka.TestKit; using Xunit; +using Xunit.Abstractions; namespace Akka.Cluster.Tests { - public class SerializationChecksSpec : ClusterSpecBase + public class SerializationChecksSpec : SerializationChecksBase { + public SerializationChecksSpec(ITestOutputHelper output) : base(output, false) + { + } + } + + public class SerializationChecksLegacySpec : SerializationChecksBase + { + public SerializationChecksLegacySpec(ITestOutputHelper output) : base(output, true) + { + } + } + + public abstract class SerializationChecksBase : ClusterSpecBase + { + protected SerializationChecksBase(ITestOutputHelper output, bool useLegacyHeartbeat) : base(output, useLegacyHeartbeat) + { + } + [Fact] public void Settings_serializemessages_and_serializecreators_must_be_on_for_tests() { diff --git a/src/core/Akka.Cluster/ClusterSettings.cs b/src/core/Akka.Cluster/ClusterSettings.cs index 2894cd50c5d..9eb3e4aece0 100644 --- a/src/core/Akka.Cluster/ClusterSettings.cs +++ b/src/core/Akka.Cluster/ClusterSettings.cs @@ -118,6 +118,7 @@ TimeSpan GetWeaklyUpDuration() WeaklyUpAfter = GetWeaklyUpDuration(); + UseLegacyHeartbeatMessage = clusterConfig.GetBoolean("use-legacy-heartbeat-message", false); } /// @@ -300,6 +301,13 @@ TimeSpan GetWeaklyUpDuration() /// The leader will move members to status once convergence has been reached. /// public TimeSpan WeaklyUpAfter { get; } + + /// + /// Enable/disable legacy pre-1.4.19 and + /// wire format serialization support. + /// Set this to true if you're doing a rolling update from Akka.NET version older than 1.4.19. + /// + public bool UseLegacyHeartbeatMessage { get; } } } diff --git a/src/core/Akka.Cluster/Configuration/Cluster.conf b/src/core/Akka.Cluster/Configuration/Cluster.conf index a708faee3af..261ff2ae93a 100644 --- a/src/core/Akka.Cluster/Configuration/Cluster.conf +++ b/src/core/Akka.Cluster/Configuration/Cluster.conf @@ -169,6 +169,10 @@ akka { # greater than this value. reduce-gossip-different-view-probability = 400 + # Enable/disable legacy pre-1.4.19 heartbeat and heartbeat response wire format serialization support + # Set this flag to true if you're doing a rolling update from Akka.NET version older than 1.4.19. + use-legacy-heartbeat-message = false + # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used by the cluster subsystem to detect unreachable # members. diff --git a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs index 6b114931623..b53de94f557 100644 --- a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs +++ b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs @@ -55,6 +55,16 @@ public class ClusterMessageSerializer : SerializerWithStringManifest internal const string GossipEnvelopeManifest = "Akka.Cluster.GossipEnvelope, Akka.Cluster"; internal const string ClusterRouterPoolManifest = "Akka.Cluster.Routing.ClusterRouterPool, Akka.Cluster"; + private Option _useLegacyHeartbeatMessageDontUseDirectly = Option.None; + private bool UseLegacyHeartbeatMessage + { + get + { + if(_useLegacyHeartbeatMessageDontUseDirectly.IsEmpty) + _useLegacyHeartbeatMessageDontUseDirectly = Cluster.Get(system).Settings.UseLegacyHeartbeatMessage; + return _useLegacyHeartbeatMessageDontUseDirectly.Value; + } + } public ClusterMessageSerializer(ExtendedActorSystem system) : base(system) { @@ -67,9 +77,13 @@ public override byte[] ToBinary(object obj) switch (obj) { case ClusterHeartbeatSender.Heartbeat heartbeat: - return AddressToProto(heartbeat.From).ToByteArray(); + return UseLegacyHeartbeatMessage + ? AddressToProto(heartbeat.From).ToByteArray() + : HeartbeatToProto(heartbeat).ToByteArray(); case ClusterHeartbeatSender.HeartbeatRsp heartbeatRsp: - return UniqueAddressToProto(heartbeatRsp.From).ToByteArray(); + return UseLegacyHeartbeatMessage + ? UniqueAddressToProto(heartbeatRsp.From).ToByteArray() + : HeartbeatRspToProto(heartbeatRsp).ToByteArray(); case GossipEnvelope gossipEnvelope: return GossipEnvelopeToProto(gossipEnvelope); case GossipStatus gossipStatus: @@ -159,9 +173,9 @@ public override string Manifest(object o) case InternalClusterAction.InitJoinNack _: return InitJoinNackManifest; case ClusterHeartbeatSender.Heartbeat _: - return HeartBeatManifestPre1419; + return UseLegacyHeartbeatMessage ? HeartBeatManifestPre1419 : HeartBeatManifest; case ClusterHeartbeatSender.HeartbeatRsp _: - return HeartBeatRspManifestPre1419; + return UseLegacyHeartbeatMessage ? HeartBeatRspManifestPre1419 : HeartBeatRspManifest; case InternalClusterAction.ExitingConfirmed _: return ExitingConfirmedManifest; case GossipStatus _: @@ -460,6 +474,14 @@ private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRspAsUniq return new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1); } + private static Proto.Msg.HeartBeatResponse HeartbeatRspToProto(ClusterHeartbeatSender.HeartbeatRsp heartbeatRsp) + => new HeartBeatResponse + { + From = UniqueAddressToProto(heartbeatRsp.From), + CreationTime = heartbeatRsp.CreationTimeNanos, + SequenceNr = heartbeatRsp.SequenceNr + }; + private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRsp(byte[] bytes) { var hbsp = HeartBeatResponse.Parser.ParseFrom(bytes); @@ -471,6 +493,14 @@ private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeatAsAddress(by return new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes)), -1, -1); } + private static Proto.Msg.Heartbeat HeartbeatToProto(ClusterHeartbeatSender.Heartbeat heartbeat) + => new Heartbeat + { + From = AddressToProto(heartbeat.From), + CreationTime = heartbeat.CreationTimeNanos, + SequenceNr = heartbeat.SequenceNr + }; + private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeat(byte[] bytes) { var hb = Heartbeat.Parser.ParseFrom(bytes);