From dd0f6b2da2dc8a891f4193f13f2b13f69a2f2c8a Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 13 Apr 2021 05:24:48 +0700 Subject: [PATCH 1/3] porting Cluster heartbeat timings, hardened Akka.Cluster serialization port https://github.com/akka/akka/pull/27281 port https://github.com/akka/akka/pull/25183 port https://github.com/akka/akka/pull/24625 --- .../ClusterHeartbeatReceiverSpec.cs | 35 ++ .../ClusterHeartbeatSenderSpec.cs | 66 +++ .../ClusterMessageSerializerSpec.cs | 36 +- src/core/Akka.Cluster/Cluster.cs | 26 +- src/core/Akka.Cluster/ClusterDaemon.cs | 2 +- src/core/Akka.Cluster/ClusterHeartbeat.cs | 146 +++-- .../Serialization/ClusterMessageSerializer.cs | 171 ++++-- .../Serialization/Proto/ClusterMessages.g.cs | 498 +++++++++++++++--- src/core/Akka.Remote.TestKit/CommandLine.cs | 19 +- src/protobuf/ClusterMessages.proto | 14 + 10 files changed, 862 insertions(+), 151 deletions(-) create mode 100644 src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs create mode 100644 src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs new file mode 100644 index 00000000000..90dd2c6d90c --- /dev/null +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs @@ -0,0 +1,35 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Configuration; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; +using static Akka.Cluster.ClusterHeartbeatSender; + +namespace Akka.Cluster.Tests +{ + public class ClusterHeartbeatReceiverSpec : AkkaSpec + { + public static Config Config = @"akka.actor.provider = cluster"; + + public ClusterHeartbeatReceiverSpec(ITestOutputHelper output) + : base(Config, output) + { + + } + + [Fact] + public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime() + { + var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys))); + heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2)); + ExpectMsg(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2)); + } + } +} diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs new file mode 100644 index 00000000000..fdea539c91b --- /dev/null +++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs @@ -0,0 +1,66 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Configuration; +using Akka.TestKit; +using Akka.Util; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; +using static Akka.Cluster.ClusterHeartbeatSender; + +namespace Akka.Cluster.Tests +{ + public class ClusterHeartbeatSenderSpec : AkkaSpec + { + class TestClusterHeartbeatSender : ClusterHeartbeatSender + { + private readonly TestProbe _probe; + + public TestClusterHeartbeatSender(TestProbe probe) + { + _probe = probe; + } + + protected override void PreStart() + { + // don't register for cluster events + } + + protected override ActorSelection HeartbeatReceiver(Address address) + { + return Context.ActorSelection(_probe.Ref.Path); + } + } + + public static readonly Config Config = @" + akka.loglevel = DEBUG + akka.actor.provider = cluster + akka.cluster.failure-detector.heartbeat-interval = 0.2s + "; + + public ClusterHeartbeatSenderSpec(ITestOutputHelper output) + : base(Config, output){ } + + [Fact] + public void ClusterHeartBeatSender_must_increment_heartbeat_SeqNo() + { + var probe = CreateTestProbe(); + var underTest = Sys.ActorOf(Props.Create(() => new TestClusterHeartbeatSender(probe))); + + underTest.Tell(new ClusterEvent.CurrentClusterState()); + underTest.Tell(new ClusterEvent.MemberUp(new Member( + new UniqueAddress(new Address("akka", Sys.Name), 1), 1, + MemberStatus.Up, ImmutableHashSet.Empty, AppVersion.Zero))); + + probe.ExpectMsg().SequenceNr.Should().Be(1L); + probe.ExpectMsg().SequenceNr.Should().Be(2L); + } + } +} diff --git a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs index 2781b51ffb4..19dc355ff8d 100644 --- a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs +++ b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs @@ -8,11 +8,14 @@ using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster.Routing; +using Akka.Cluster.Serialization; using Akka.Routing; +using Akka.Serialization; using Akka.TestKit; using Xunit; using FluentAssertions; using Akka.Util; +using Google.Protobuf; namespace Akka.Cluster.Tests.Serialization { @@ -33,19 +36,47 @@ public ClusterMessageSerializerSpec() public void Can_serialize_Heartbeat() { var address = new Address("akka.tcp", "system", "some.host.org", 4711); - var message = new ClusterHeartbeatSender.Heartbeat(address); + var message = new ClusterHeartbeatSender.Heartbeat(address, -1, -1); AssertEqual(message); } + [Fact] + public void Can_serialize_Hearbeatv1419_later() + { + var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat() + { + From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address), + CreationTime = 2, + SequenceNr = 1 + }.ToByteArray(); + + var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat)); + serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest); + } + [Fact] public void Can_serialize_HeartbeatRsp() { var address = new Address("akka.tcp", "system", "some.host.org", 4711); var uniqueAddress = new UniqueAddress(address, 17); - var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress); + var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1); AssertEqual(message); } + [Fact] + public void Can_serialize_HearbeatRspv1419_later() + { + var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse() + { + From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress), + CreationTime = 2, + SequenceNr = 1 + }.ToByteArray(); + + var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat)); + serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest); + } + [Fact] public void Can_serialize_GossipEnvelope() { @@ -191,6 +222,7 @@ private T AssertAndReturn(T message) { var serializer = Sys.Serialization.FindSerializerFor(message); var serialized = serializer.ToBinary(message); + serializer.Should().BeOfType(); return serializer.FromBinary(serialized); } diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 52de7e074b5..5bef4d02955 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -538,10 +538,7 @@ internal void Shutdown() LogInfo("Shutting down..."); System.Stop(_clusterDaemons); - if (_readView != null) - { - _readView.Dispose(); - } + _readView?.Dispose(); LogInfo("Successfully shut down"); } @@ -583,6 +580,27 @@ public InfoLogger(ILoggingAdapter log, ClusterSettings settings, Address selfAdd _selfAddress = selfAddress; } + /// + /// Creates an log entry with the specific message. + /// + /// The message being logged. + internal void LogDebug(string message) + { + if (_log.IsDebugEnabled) + _log.Debug("Cluster Node [{0}] - {1}", _selfAddress, message); + } + + /// + /// Creates an log entry with the specific template and arguments. + /// + /// The template being rendered and logged. + /// The argument that fills in the template placeholder. + internal void LogDebug(string template, object arg1) + { + if (_log.IsDebugEnabled) + _log.Debug("Cluster Node [{1}] - " + template, arg1, _selfAddress); + } + /// /// Creates an log entry with the specific message. /// diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index e3f0e085294..c8a1d3e9333 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -898,7 +898,7 @@ private void CreateChildren() { _coreSupervisor = Context.ActorOf(Props.Create(), "core"); - Context.ActorOf(Props.Create(), "heartbeatReceiver"); + Context.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Context.System)), "heartbeatReceiver"); } protected override void PostStop() diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs index f4a8ef607f0..0ecf786f4a7 100644 --- a/src/core/Akka.Cluster/ClusterHeartbeat.cs +++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs @@ -12,6 +12,7 @@ using Akka.Actor; using Akka.Event; using Akka.Remote; +using Akka.Util; using Akka.Util.Internal; namespace Akka.Cluster @@ -21,33 +22,53 @@ namespace Akka.Cluster /// /// Receives messages and replies. /// - internal sealed class ClusterHeartbeatReceiver : ReceiveActor + internal sealed class ClusterHeartbeatReceiver : UntypedActor { - private readonly Lazy _selfHeartbeatRsp; + // Important - don't use Cluster.Get(Context.System) in constructor because that would + // cause deadlock. See startup sequence in ClusterDaemon. + private readonly Lazy _cluster; + + public bool VerboseHeartbeat => _cluster.Value.Settings.VerboseHeartbeatLogging; /// /// TBD /// - public ClusterHeartbeatReceiver() + public ClusterHeartbeatReceiver(Func getCluster) { - // Important - don't use Cluster.Get(Context.System) in constructor because that would - // cause deadlock. See startup sequence in ClusterDaemon. - _selfHeartbeatRsp = new Lazy(() => - new ClusterHeartbeatSender.HeartbeatRsp(Cluster.Get(Context.System).SelfUniqueAddress)); + _cluster = new Lazy(getCluster); + } - Receive(heartbeat => Sender.Tell(_selfHeartbeatRsp.Value)); + protected override void OnReceive(object message) + { + switch (message) + { + case ClusterHeartbeatSender.Heartbeat hb: + // TODO log the sequence nr once serializer is enabled + if(VerboseHeartbeat) _cluster.Value.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From); + Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.Value.SelfUniqueAddress, + hb.SequenceNr, hb.CreationTimeNanos)); + break; + default: + Unhandled(message); + break; + } } + + public static Props Props(Func getCluster) + { + return Akka.Actor.Props.Create(() => new ClusterHeartbeatReceiver(getCluster)); + } + } /// /// INTERNAL API /// - internal sealed class ClusterHeartbeatSender : ReceiveActor + internal class ClusterHeartbeatSender : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Cluster _cluster; private readonly IFailureDetectorRegistry
_failureDetector; - private readonly Heartbeat _selfHeartbeat; private ClusterHeartbeatSenderState _state; private readonly ICancelable _heartbeatTask; @@ -66,8 +87,6 @@ public ClusterHeartbeatSender() // the failureDetector is only updated by this actor, but read from other places _failureDetector = _cluster.FailureDetector; - _selfHeartbeat = new Heartbeat(_cluster.SelfAddress); - _state = new ClusterHeartbeatSenderState( ring: new HeartbeatNodeRing( _cluster.SelfUniqueAddress, @@ -88,6 +107,13 @@ public ClusterHeartbeatSender() Initializing(); } + private long _seqNo; + private Heartbeat SelfHeartbeat() + { + _seqNo += 1; + return new Heartbeat(_cluster.SelfAddress, _seqNo, MonotonicClock.GetNanos()); + } + /// /// TBD /// @@ -112,7 +138,7 @@ protected override void PostStop() /// /// Looks up and returns the remote cluster heartbeat connection for the specific address. /// - private ActorSelection HeartbeatReceiver(Address address) + protected virtual ActorSelection HeartbeatReceiver(Address address) { return Context.ActorSelection(new RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver"); } @@ -133,7 +159,7 @@ private void Initializing() private void Active() { Receive(tick => DoHeartbeat()); - Receive(rsp => DoHeartbeatRsp(rsp.From)); + Receive(rsp => DoHeartbeatRsp(rsp)); Receive(removed => RemoveMember(removed.Member)); Receive(evt => AddMember(evt.Member)); Receive(m => UnreachableMember(m.Member)); @@ -204,7 +230,7 @@ private void DoHeartbeat() new ExpectedFirstHeartbeat(to), Self); } - HeartbeatReceiver(to.Address).Tell(_selfHeartbeat); + HeartbeatReceiver(to.Address).Tell(SelfHeartbeat()); } CheckTickInterval(); @@ -227,13 +253,14 @@ private void CheckTickInterval() _tickTimestamp = DateTime.UtcNow; } - private void DoHeartbeatRsp(UniqueAddress from) + private void DoHeartbeatRsp(HeartbeatRsp rsp) { if (_cluster.Settings.VerboseHeartbeatLogging) { - _log.Debug("Cluster Node [{0}] - Heartbeat response from [{1}]", _cluster.SelfAddress, from.Address); + // TODO: log response time and validate sequence nrs once serialisation of sendTime is released + _log.Debug("Cluster Node [{0}] - Heartbeat response from [{1}]", _cluster.SelfAddress, rsp.From.Address); } - _state = _state.HeartbeatRsp(from); + _state = _state.HeartbeatRsp(rsp.From); } private void TriggerFirstHeart(UniqueAddress from) @@ -253,70 +280,85 @@ private void TriggerFirstHeart(UniqueAddress from) /// /// Sent at regular intervals for failure detection /// - internal sealed class Heartbeat : IClusterMessage, IPriorityMessage, IDeadLetterSuppression + internal sealed class Heartbeat : IClusterMessage, IPriorityMessage, IDeadLetterSuppression, IEquatable { - /// - /// TBD - /// - /// TBD - public Heartbeat(Address from) + public Heartbeat(Address from, long sequenceNr, long creationTimeNanos) { From = from; + SequenceNr = sequenceNr; + CreationTimeNanos = creationTimeNanos; } - /// - /// TBD - /// public Address From { get; } -#pragma warning disable 659 //there might very well be multiple heartbeats from the same address. overriding GetHashCode may have uninteded side effects - /// + public long SequenceNr { get; } + + public long CreationTimeNanos { get; } + + public bool Equals(Heartbeat other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return From.Equals(other.From) && SequenceNr == other.SequenceNr && CreationTimeNanos == other.CreationTimeNanos; + } + public override bool Equals(object obj) -#pragma warning restore 659 { - if (ReferenceEquals(null, obj)) return false; - if (ReferenceEquals(this, obj)) return true; - return obj is Heartbeat && Equals((Heartbeat)obj); + return ReferenceEquals(this, obj) || obj is Heartbeat other && Equals(other); } - private bool Equals(Heartbeat other) + public override int GetHashCode() { - return Equals(From, other.From); + unchecked + { + var hashCode = From.GetHashCode(); + hashCode = (hashCode * 397) ^ SequenceNr.GetHashCode(); + hashCode = (hashCode * 397) ^ CreationTimeNanos.GetHashCode(); + return hashCode; + } } } /// /// Sends replies to messages /// - internal sealed class HeartbeatRsp : IClusterMessage, IPriorityMessage, IDeadLetterSuppression + internal sealed class HeartbeatRsp : IClusterMessage, IPriorityMessage, IDeadLetterSuppression, IEquatable { - /// - /// TBD - /// - /// TBD - public HeartbeatRsp(UniqueAddress from) + public HeartbeatRsp(UniqueAddress from, long sequenceNr, long creationTimeNanos) { From = from; + SequenceNr = sequenceNr; + CreationTimeNanos = creationTimeNanos; } - /// - /// TBD - /// public UniqueAddress From { get; } -#pragma warning disable 659 //there might very well be multiple heartbeats from the same address. overriding GetHashCode may have uninteded side effects - /// + public long SequenceNr { get; } + + public long CreationTimeNanos { get; } + + public bool Equals(HeartbeatRsp other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return From.Equals(other.From) && SequenceNr == other.SequenceNr + && CreationTimeNanos == other.CreationTimeNanos; + } + public override bool Equals(object obj) -#pragma warning restore 659 { - if (ReferenceEquals(null, obj)) return false; - if (ReferenceEquals(this, obj)) return true; - return obj is HeartbeatRsp && Equals((HeartbeatRsp)obj); + return ReferenceEquals(this, obj) || obj is HeartbeatRsp other && Equals(other); } - private bool Equals(HeartbeatRsp other) + public override int GetHashCode() { - return Equals(From, other.From); + unchecked + { + var hashCode = From.GetHashCode(); + hashCode = (hashCode * 397) ^ SequenceNr.GetHashCode(); + hashCode = (hashCode * 397) ^ CreationTimeNanos.GetHashCode(); + return hashCode; + } } } diff --git a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs index dae1b3a199e..f3d7e443262 100644 --- a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs +++ b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs @@ -12,41 +12,55 @@ using System.Runtime.CompilerServices; using System.Runtime.Serialization; using Akka.Actor; -using Akka.Cluster.Routing; +using Akka.Annotations; +using Akka.Cluster.Serialization.Proto.Msg; using Akka.Serialization; using Akka.Util; using Akka.Util.Internal; using Google.Protobuf; using AddressData = Akka.Remote.Serialization.Proto.Msg.AddressData; +using ClusterRouterPool = Akka.Cluster.Routing.ClusterRouterPool; +using ClusterRouterPoolSettings = Akka.Cluster.Routing.ClusterRouterPoolSettings; namespace Akka.Cluster.Serialization { - public class ClusterMessageSerializer : Serializer + [InternalApi] + public class ClusterMessageSerializer : SerializerWithStringManifest { - private readonly Dictionary> _fromBinaryMap; + /* + * BUG: should have been a SerializerWithStringManifest this entire time + * Since it wasn't need to include full type names for backwards compatibility + */ + internal const string JoinManifest = "Akka.Cluster.InternalClusterAction+Join, Akka.Cluster"; + internal const string WelcomeManifest = "Akka.Cluster.InternalClusterAction+Welcome, Akka.Cluster"; + internal const string LeaveManifest = "Akka.Cluster.ClusterUserAction+Leave, Akka.Cluster"; + internal const string DownManifest = "Akka.Cluster.ClusterUserAction+Down, Akka.Cluster"; + + internal const string InitJoinManifest = "Akka.Cluster.InternalClusterAction+InitJoin, Akka.Cluster"; + + internal const string InitJoinAckManifest = "Akka.Cluster.InternalClusterAction+InitJoinAck, Akka.Cluster"; + + internal const string InitJoinNackManifest = "Akka.Cluster.InternalClusterAction+InitJoinNack, Akka.Cluster"; + + // TODO: remove in a future version of Akka.NET (2.0) + internal const string HeartBeatManifestPre1419 = "Akka.Cluster.ClusterHeartbeatSender+Heartbeat, Akka.Cluster"; + internal const string HeartBeatRspManifestPre1419 = "Akka.Cluster.ClusterHeartbeatSender+HeartbeatRsp, Akka.Cluster"; + + internal const string HeartBeatManifest = "HB"; + internal const string HeartBeatRspManifest = "HBR"; + + internal const string ExitingConfirmedManifest = "Akka.Cluster.InternalClusterAction+ExitingConfirmed, Akka.Cluster"; + + internal const string GossipStatusManifest = "Akka.Cluster.GossipStatus, Akka.Cluster"; + internal const string GossipEnvelopeManifest = "Akka.Cluster.GossipEnvelope, Akka.Cluster"; + internal const string ClusterRouterPoolManifest = "Akka.Cluster.Routing.ClusterRouterPool, Akka.Cluster"; + public ClusterMessageSerializer(ExtendedActorSystem system) : base(system) { - _fromBinaryMap = new Dictionary> - { - [typeof(ClusterHeartbeatSender.Heartbeat)] = bytes => new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes))), - [typeof(ClusterHeartbeatSender.HeartbeatRsp)] = bytes => new ClusterHeartbeatSender.HeartbeatRsp(UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes))), - [typeof(GossipEnvelope)] = GossipEnvelopeFrom, - [typeof(GossipStatus)] = GossipStatusFrom, - [typeof(InternalClusterAction.Join)] = JoinFrom, - [typeof(InternalClusterAction.Welcome)] = WelcomeFrom, - [typeof(ClusterUserAction.Leave)] = bytes => new ClusterUserAction.Leave(AddressFrom(AddressData.Parser.ParseFrom(bytes))), - [typeof(ClusterUserAction.Down)] = bytes => new ClusterUserAction.Down(AddressFrom(AddressData.Parser.ParseFrom(bytes))), - [typeof(InternalClusterAction.InitJoin)] = bytes => new InternalClusterAction.InitJoin(), - [typeof(InternalClusterAction.InitJoinAck)] = bytes => new InternalClusterAction.InitJoinAck(AddressFrom(AddressData.Parser.ParseFrom(bytes))), - [typeof(InternalClusterAction.InitJoinNack)] = bytes => new InternalClusterAction.InitJoinNack(AddressFrom(AddressData.Parser.ParseFrom(bytes))), - [typeof(InternalClusterAction.ExitingConfirmed)] = bytes => new InternalClusterAction.ExitingConfirmed(UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes))), - [typeof(ClusterRouterPool)] = ClusterRouterPoolFrom - }; - } - - public override bool IncludeManifest => true; + + } public override byte[] ToBinary(object obj) { @@ -83,12 +97,79 @@ public override byte[] ToBinary(object obj) } } - public override object FromBinary(byte[] bytes, Type type) + public override object FromBinary(byte[] bytes, string manifest) { - if (_fromBinaryMap.TryGetValue(type, out var factory)) - return factory(bytes); + switch (manifest) + { + case HeartBeatManifestPre1419: + return DeserializeHeartbeatAsAddress(bytes); + case HeartBeatRspManifestPre1419: + return DeserializeHeartbeatRspAsUniqueAddress(bytes); + case HeartBeatManifest: + return DeserializeHeartbeat(bytes); + case HeartBeatRspManifest: + return DeserializeHeartbeatRsp(bytes); + case GossipStatusManifest: + return GossipStatusFrom(bytes); + case GossipEnvelopeManifest: + return GossipEnvelopeFrom(bytes); + case InitJoinManifest: + return new InternalClusterAction.InitJoin(); + case InitJoinAckManifest: + return new InternalClusterAction.InitJoinAck(AddressFrom(AddressData.Parser.ParseFrom(bytes))); + case InitJoinNackManifest: + return new InternalClusterAction.InitJoinNack(AddressFrom(AddressData.Parser.ParseFrom(bytes))); + case JoinManifest: + return JoinFrom(bytes); + case WelcomeManifest: + return WelcomeFrom(bytes); + case LeaveManifest: + return new ClusterUserAction.Leave(AddressFrom(AddressData.Parser.ParseFrom(bytes))); + case DownManifest: + return new ClusterUserAction.Down(AddressFrom(AddressData.Parser.ParseFrom(bytes))); + case ExitingConfirmedManifest: + return new InternalClusterAction.ExitingConfirmed( + UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes))); + case ClusterRouterPoolManifest: + return ClusterRouterPoolFrom(bytes); + default: + throw new ArgumentException($"Unknown manifest [{manifest}] in [{nameof(ClusterMessageSerializer)}]"); + } + } - throw new SerializationException($"{nameof(ClusterMessageSerializer)} cannot deserialize object of type {type}"); + public override string Manifest(object o) + { + switch (o) + { + case InternalClusterAction.Join _: + return JoinManifest; + case InternalClusterAction.Welcome _: + return WelcomeManifest; + case ClusterUserAction.Leave _: + return LeaveManifest; + case ClusterUserAction.Down _: + return DownManifest; + case InternalClusterAction.InitJoin _: + return InitJoinManifest; + case InternalClusterAction.InitJoinAck _: + return InitJoinAckManifest; + case InternalClusterAction.InitJoinNack _: + return InitJoinNackManifest; + case ClusterHeartbeatSender.Heartbeat _: + return HeartBeatManifestPre1419; + case ClusterHeartbeatSender.HeartbeatRsp _: + return HeartBeatRspManifestPre1419; + case InternalClusterAction.ExitingConfirmed _: + return ExitingConfirmedManifest; + case GossipStatus _: + return GossipStatusManifest; + case GossipEnvelope _: + return GossipEnvelopeManifest; + case ClusterRouterPool _: + return ClusterRouterPoolManifest; + default: + throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{GetType()}]"); + } } // @@ -110,6 +191,7 @@ private static InternalClusterAction.Join JoinFrom(byte[] bytes) return new InternalClusterAction.Join(UniqueAddressFrom(join.Node), join.Roles.ToImmutableHashSet(), ver); } + // TODO: need to gzip compress the Welcome message for large clusters private static byte[] WelcomeMessageBuilder(InternalClusterAction.Welcome welcome) { var welcomeProto = new Proto.Msg.Welcome(); @@ -366,12 +448,38 @@ private static int MapWithErrorMessage(Dictionary map, T value, strin throw new ArgumentException($"Unknown {unknown} [{value}] in cluster message"); } + // + // Heartbeat + // + private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRspAsUniqueAddress(byte[] bytes) + { + var uniqueAddress = UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes)); + return new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1); + } + + private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRsp(byte[] bytes) + { + var hbsp = HeartBeatResponse.Parser.ParseFrom(bytes); + return new ClusterHeartbeatSender.HeartbeatRsp(UniqueAddressFrom(hbsp.From), hbsp.SequenceNr, hbsp.CreationTime); + } + + private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeatAsAddress(byte[] bytes) + { + return new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes)), -1, -1); + } + + private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeat(byte[] bytes) + { + var hb = Heartbeat.Parser.ParseFrom(bytes); + return new ClusterHeartbeatSender.Heartbeat(AddressFrom(hb.From), hb.SequenceNr, hb.CreationTime); + } + // // Address // [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static AddressData AddressToProto(Address address) + internal static AddressData AddressToProto(Address address) { var message = new AddressData(); message.System = address.System; @@ -382,7 +490,7 @@ private static AddressData AddressToProto(Address address) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static Address AddressFrom(AddressData addressProto) + internal static Address AddressFrom(AddressData addressProto) { return new Address( addressProto.Protocol, @@ -392,7 +500,7 @@ private static Address AddressFrom(AddressData addressProto) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static Proto.Msg.UniqueAddress UniqueAddressToProto(UniqueAddress uniqueAddress) + internal static Proto.Msg.UniqueAddress UniqueAddressToProto(UniqueAddress uniqueAddress) { var message = new Proto.Msg.UniqueAddress(); message.Address = AddressToProto(uniqueAddress.Address); @@ -401,7 +509,7 @@ private static Proto.Msg.UniqueAddress UniqueAddressToProto(UniqueAddress unique } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static UniqueAddress UniqueAddressFrom(Proto.Msg.UniqueAddress uniqueAddressProto) + internal static UniqueAddress UniqueAddressFrom(Proto.Msg.UniqueAddress uniqueAddressProto) { return new UniqueAddress(AddressFrom(uniqueAddressProto.Address), (int)uniqueAddressProto.Uid); } @@ -409,8 +517,7 @@ private static UniqueAddress UniqueAddressFrom(Proto.Msg.UniqueAddress uniqueAdd [MethodImpl(MethodImplOptions.AggressiveInlining)] private static string GetObjectManifest(Serializer serializer, object obj) { - var manifestSerializer = serializer as SerializerWithStringManifest; - if (manifestSerializer != null) + if (serializer is SerializerWithStringManifest manifestSerializer) { return manifestSerializer.Manifest(obj); } diff --git a/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs b/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs index fb96c5a3959..bf5059bfda8 100644 --- a/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs +++ b/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs @@ -29,58 +29,66 @@ static ClusterMessagesReflection() { "cHBWZXJzaW9uGAMgASgJIooBCgdXZWxjb21lEkEKBGZyb20YASABKAsyMy5B", "a2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRk", "cmVzcxI8CgZnb3NzaXAYAiABKAsyLC5Ba2thLkNsdXN0ZXIuU2VyaWFsaXph", - "dGlvbi5Qcm90by5Nc2cuR29zc2lwIq4BCg5Hb3NzaXBFbnZlbG9wZRJBCgRm", - "cm9tGAEgASgLMjMuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8u", - "TXNnLlVuaXF1ZUFkZHJlc3MSPwoCdG8YAiABKAsyMy5Ba2thLkNsdXN0ZXIu", - "U2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRkcmVzcxIYChBzZXJp", - "YWxpemVkR29zc2lwGAMgASgMIqgBCgxHb3NzaXBTdGF0dXMSQQoEZnJvbRgB", - "IAEoCzIzLkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1zZy5V", - "bmlxdWVBZGRyZXNzEhEKCWFsbEhhc2hlcxgCIAMoCRJCCgd2ZXJzaW9uGAMg", - "ASgLMjEuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlZl", - "Y3RvckNsb2NrItsCCgZHb3NzaXASSQoMYWxsQWRkcmVzc2VzGAEgAygLMjMu", - "QWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlVuaXF1ZUFk", - "ZHJlc3MSEAoIYWxsUm9sZXMYAiADKAkSEQoJYWxsSGFzaGVzGAMgAygJEj0K", - "B21lbWJlcnMYBCADKAsyLC5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Q", - "cm90by5Nc2cuTWVtYmVyEkYKCG92ZXJ2aWV3GAUgASgLMjQuQWtrYS5DbHVz", - "dGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLkdvc3NpcE92ZXJ2aWV3EkIK", - "B3ZlcnNpb24YBiABKAsyMS5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Q", - "cm90by5Nc2cuVmVjdG9yQ2xvY2sSFgoOYWxsQXBwVmVyc2lvbnMYCCADKAki", - "eAoOR29zc2lwT3ZlcnZpZXcSDAoEc2VlbhgBIAMoBRJYChRvYnNlcnZlclJl", - "YWNoYWJpbGl0eRgCIAMoCzI6LkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9u", - "LlByb3RvLk1zZy5PYnNlcnZlclJlYWNoYWJpbGl0eSKVAQoUT2JzZXJ2ZXJS", - "ZWFjaGFiaWxpdHkSFAoMYWRkcmVzc0luZGV4GAEgASgFEg8KB3ZlcnNpb24Y", - "BCABKAMSVgoTc3ViamVjdFJlYWNoYWJpbGl0eRgCIAMoCzI5LkFra2EuQ2x1", - "c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1zZy5TdWJqZWN0UmVhY2hhYmls", - "aXR5IuABChNTdWJqZWN0UmVhY2hhYmlsaXR5EhQKDGFkZHJlc3NJbmRleBgB", - "IAEoBRJcCgZzdGF0dXMYAyABKA4yTC5Ba2thLkNsdXN0ZXIuU2VyaWFsaXph", - "dGlvbi5Qcm90by5Nc2cuU3ViamVjdFJlYWNoYWJpbGl0eS5SZWFjaGFiaWxp", - "dHlTdGF0dXMSDwoHdmVyc2lvbhgEIAEoAyJEChJSZWFjaGFiaWxpdHlTdGF0", - "dXMSDQoJUmVhY2hhYmxlEAASDwoLVW5yZWFjaGFibGUQARIOCgpUZXJtaW5h", - "dGVkEAIikgIKBk1lbWJlchIUCgxhZGRyZXNzSW5kZXgYASABKAUSEAoIdXBO", - "dW1iZXIYAiABKAUSSQoGc3RhdHVzGAMgASgOMjkuQWtrYS5DbHVzdGVyLlNl", - "cmlhbGl6YXRpb24uUHJvdG8uTXNnLk1lbWJlci5NZW1iZXJTdGF0dXMSGAoM", - "cm9sZXNJbmRleGVzGAQgAygFQgIQARIXCg9hcHBWZXJzaW9uSW5kZXgYBSAB", - "KAUiYgoMTWVtYmVyU3RhdHVzEgsKB0pvaW5pbmcQABIGCgJVcBABEgsKB0xl", - "YXZpbmcQAhILCgdFeGl0aW5nEAMSCAoERG93bhAEEgsKB1JlbW92ZWQQBRIM", - "CghXZWFrbHlVcBAGIp4BCgtWZWN0b3JDbG9jaxIRCgl0aW1lc3RhbXAYASAB", - "KAMSSwoIdmVyc2lvbnMYAiADKAsyOS5Ba2thLkNsdXN0ZXIuU2VyaWFsaXph", - "dGlvbi5Qcm90by5Nc2cuVmVjdG9yQ2xvY2suVmVyc2lvbhovCgdWZXJzaW9u", - "EhEKCWhhc2hJbmRleBgBIAEoBRIRCgl0aW1lc3RhbXAYAiABKAMiXwoNVW5p", - "cXVlQWRkcmVzcxJBCgdhZGRyZXNzGAEgASgLMjAuQWtrYS5SZW1vdGUuU2Vy", - "aWFsaXphdGlvbi5Qcm90by5Nc2cuQWRkcmVzc0RhdGESCwoDdWlkGAIgASgN", - "IqABChFDbHVzdGVyUm91dGVyUG9vbBI4CgRwb29sGAEgASgLMiouQWtrYS5D", - "bHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlBvb2wSUQoIc2V0dGlu", - "Z3MYAiABKAsyPy5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5N", - "c2cuQ2x1c3RlclJvdXRlclBvb2xTZXR0aW5ncyI8CgRQb29sEhQKDHNlcmlh", - "bGl6ZXJJZBgBIAEoDRIQCghtYW5pZmVzdBgCIAEoCRIMCgRkYXRhGAMgASgM", - "InwKGUNsdXN0ZXJSb3V0ZXJQb29sU2V0dGluZ3MSFgoOdG90YWxJbnN0YW5j", - "ZXMYASABKA0SGwoTbWF4SW5zdGFuY2VzUGVyTm9kZRgCIAEoDRIZChFhbGxv", - "d0xvY2FsUm91dGVlcxgDIAEoCBIPCgd1c2VSb2xlGAQgASgJYgZwcm90bzM=")); + "dGlvbi5Qcm90by5Nc2cuR29zc2lwInUKCUhlYXJ0YmVhdBI+CgRmcm9tGAEg", + "ASgLMjAuQWtrYS5SZW1vdGUuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuQWRk", + "cmVzc0RhdGESEgoKc2VxdWVuY2VOchgCIAEoAxIUCgxjcmVhdGlvblRpbWUY", + "AyABKBIigAEKEUhlYXJ0QmVhdFJlc3BvbnNlEkEKBGZyb20YASABKAsyMy5B", + "a2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRk", + "cmVzcxISCgpzZXF1ZW5jZU5yGAIgASgDEhQKDGNyZWF0aW9uVGltZRgDIAEo", + "AyKuAQoOR29zc2lwRW52ZWxvcGUSQQoEZnJvbRgBIAEoCzIzLkFra2EuQ2x1", + "c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1zZy5VbmlxdWVBZGRyZXNzEj8K", + "AnRvGAIgASgLMjMuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8u", + "TXNnLlVuaXF1ZUFkZHJlc3MSGAoQc2VyaWFsaXplZEdvc3NpcBgDIAEoDCKo", + "AQoMR29zc2lwU3RhdHVzEkEKBGZyb20YASABKAsyMy5Ba2thLkNsdXN0ZXIu", + "U2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRkcmVzcxIRCglhbGxI", + "YXNoZXMYAiADKAkSQgoHdmVyc2lvbhgDIAEoCzIxLkFra2EuQ2x1c3Rlci5T", + "ZXJpYWxpemF0aW9uLlByb3RvLk1zZy5WZWN0b3JDbG9jayLbAgoGR29zc2lw", + "EkkKDGFsbEFkZHJlc3NlcxgBIAMoCzIzLkFra2EuQ2x1c3Rlci5TZXJpYWxp", + "emF0aW9uLlByb3RvLk1zZy5VbmlxdWVBZGRyZXNzEhAKCGFsbFJvbGVzGAIg", + "AygJEhEKCWFsbEhhc2hlcxgDIAMoCRI9CgdtZW1iZXJzGAQgAygLMiwuQWtr", + "YS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLk1lbWJlchJGCghv", + "dmVydmlldxgFIAEoCzI0LkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9uLlBy", + "b3RvLk1zZy5Hb3NzaXBPdmVydmlldxJCCgd2ZXJzaW9uGAYgASgLMjEuQWtr", + "YS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlZlY3RvckNsb2Nr", + "EhYKDmFsbEFwcFZlcnNpb25zGAggAygJIngKDkdvc3NpcE92ZXJ2aWV3EgwK", + "BHNlZW4YASADKAUSWAoUb2JzZXJ2ZXJSZWFjaGFiaWxpdHkYAiADKAsyOi5B", + "a2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuT2JzZXJ2ZXJS", + "ZWFjaGFiaWxpdHkilQEKFE9ic2VydmVyUmVhY2hhYmlsaXR5EhQKDGFkZHJl", + "c3NJbmRleBgBIAEoBRIPCgd2ZXJzaW9uGAQgASgDElYKE3N1YmplY3RSZWFj", + "aGFiaWxpdHkYAiADKAsyOS5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Q", + "cm90by5Nc2cuU3ViamVjdFJlYWNoYWJpbGl0eSLgAQoTU3ViamVjdFJlYWNo", + "YWJpbGl0eRIUCgxhZGRyZXNzSW5kZXgYASABKAUSXAoGc3RhdHVzGAMgASgO", + "MkwuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlN1Ympl", + "Y3RSZWFjaGFiaWxpdHkuUmVhY2hhYmlsaXR5U3RhdHVzEg8KB3ZlcnNpb24Y", + "BCABKAMiRAoSUmVhY2hhYmlsaXR5U3RhdHVzEg0KCVJlYWNoYWJsZRAAEg8K", + "C1VucmVhY2hhYmxlEAESDgoKVGVybWluYXRlZBACIpICCgZNZW1iZXISFAoM", + "YWRkcmVzc0luZGV4GAEgASgFEhAKCHVwTnVtYmVyGAIgASgFEkkKBnN0YXR1", + "cxgDIAEoDjI5LkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1z", + "Zy5NZW1iZXIuTWVtYmVyU3RhdHVzEhgKDHJvbGVzSW5kZXhlcxgEIAMoBUIC", + "EAESFwoPYXBwVmVyc2lvbkluZGV4GAUgASgFImIKDE1lbWJlclN0YXR1cxIL", + "CgdKb2luaW5nEAASBgoCVXAQARILCgdMZWF2aW5nEAISCwoHRXhpdGluZxAD", + "EggKBERvd24QBBILCgdSZW1vdmVkEAUSDAoIV2Vha2x5VXAQBiKeAQoLVmVj", + "dG9yQ2xvY2sSEQoJdGltZXN0YW1wGAEgASgDEksKCHZlcnNpb25zGAIgAygL", + "MjkuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlZlY3Rv", + "ckNsb2NrLlZlcnNpb24aLwoHVmVyc2lvbhIRCgloYXNoSW5kZXgYASABKAUS", + "EQoJdGltZXN0YW1wGAIgASgDIl8KDVVuaXF1ZUFkZHJlc3MSQQoHYWRkcmVz", + "cxgBIAEoCzIwLkFra2EuUmVtb3RlLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNn", + "LkFkZHJlc3NEYXRhEgsKA3VpZBgCIAEoDSKgAQoRQ2x1c3RlclJvdXRlclBv", + "b2wSOAoEcG9vbBgBIAEoCzIqLkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9u", + "LlByb3RvLk1zZy5Qb29sElEKCHNldHRpbmdzGAIgASgLMj8uQWtrYS5DbHVz", + "dGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLkNsdXN0ZXJSb3V0ZXJQb29s", + "U2V0dGluZ3MiPAoEUG9vbBIUCgxzZXJpYWxpemVySWQYASABKA0SEAoIbWFu", + "aWZlc3QYAiABKAkSDAoEZGF0YRgDIAEoDCJ8ChlDbHVzdGVyUm91dGVyUG9v", + "bFNldHRpbmdzEhYKDnRvdGFsSW5zdGFuY2VzGAEgASgNEhsKE21heEluc3Rh", + "bmNlc1Blck5vZGUYAiABKA0SGQoRYWxsb3dMb2NhbFJvdXRlZXMYAyABKAgS", + "DwoHdXNlUm9sZRgEIAEoCWIGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Akka.Remote.Serialization.Proto.Msg.ContainerFormatsReflection.Descriptor, }, new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Join), global::Akka.Cluster.Serialization.Proto.Msg.Join.Parser, new[]{ "Node", "Roles", "AppVersion" }, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Welcome), global::Akka.Cluster.Serialization.Proto.Msg.Welcome.Parser, new[]{ "From", "Gossip" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Heartbeat), global::Akka.Cluster.Serialization.Proto.Msg.Heartbeat.Parser, new[]{ "From", "SequenceNr", "CreationTime" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse), global::Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse.Parser, new[]{ "From", "SequenceNr", "CreationTime" }, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.GossipEnvelope), global::Akka.Cluster.Serialization.Proto.Msg.GossipEnvelope.Parser, new[]{ "From", "To", "SerializedGossip" }, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.GossipStatus), global::Akka.Cluster.Serialization.Proto.Msg.GossipStatus.Parser, new[]{ "From", "AllHashes", "Version" }, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Gossip), global::Akka.Cluster.Serialization.Proto.Msg.Gossip.Parser, new[]{ "AllAddresses", "AllRoles", "AllHashes", "Members", "Overview", "Version", "AllAppVersions" }, null, null, null), @@ -433,6 +441,378 @@ public void MergeFrom(pb::CodedInputStream input) { } + /// + ///* + /// Prior to version 1.4.19 + /// Heartbeat + /// Sends an Address + /// Version 1.4.19 can deserialize this message but does not send it + /// + internal sealed partial class Heartbeat : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new Heartbeat()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[2]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public Heartbeat() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public Heartbeat(Heartbeat other) : this() { + From = other.from_ != null ? other.From.Clone() : null; + sequenceNr_ = other.sequenceNr_; + creationTime_ = other.creationTime_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public Heartbeat Clone() { + return new Heartbeat(this); + } + + /// Field number for the "from" field. + public const int FromFieldNumber = 1; + private global::Akka.Remote.Serialization.Proto.Msg.AddressData from_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public global::Akka.Remote.Serialization.Proto.Msg.AddressData From { + get { return from_; } + set { + from_ = value; + } + } + + /// Field number for the "sequenceNr" field. + public const int SequenceNrFieldNumber = 2; + private long sequenceNr_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public long SequenceNr { + get { return sequenceNr_; } + set { + sequenceNr_ = value; + } + } + + /// Field number for the "creationTime" field. + public const int CreationTimeFieldNumber = 3; + private long creationTime_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public long CreationTime { + get { return creationTime_; } + set { + creationTime_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as Heartbeat); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(Heartbeat other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(From, other.From)) return false; + if (SequenceNr != other.SequenceNr) return false; + if (CreationTime != other.CreationTime) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (from_ != null) hash ^= From.GetHashCode(); + if (SequenceNr != 0L) hash ^= SequenceNr.GetHashCode(); + if (CreationTime != 0L) hash ^= CreationTime.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (from_ != null) { + output.WriteRawTag(10); + output.WriteMessage(From); + } + if (SequenceNr != 0L) { + output.WriteRawTag(16); + output.WriteInt64(SequenceNr); + } + if (CreationTime != 0L) { + output.WriteRawTag(24); + output.WriteSInt64(CreationTime); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (from_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(From); + } + if (SequenceNr != 0L) { + size += 1 + pb::CodedOutputStream.ComputeInt64Size(SequenceNr); + } + if (CreationTime != 0L) { + size += 1 + pb::CodedOutputStream.ComputeSInt64Size(CreationTime); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(Heartbeat other) { + if (other == null) { + return; + } + if (other.from_ != null) { + if (from_ == null) { + from_ = new global::Akka.Remote.Serialization.Proto.Msg.AddressData(); + } + From.MergeFrom(other.From); + } + if (other.SequenceNr != 0L) { + SequenceNr = other.SequenceNr; + } + if (other.CreationTime != 0L) { + CreationTime = other.CreationTime; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + if (from_ == null) { + from_ = new global::Akka.Remote.Serialization.Proto.Msg.AddressData(); + } + input.ReadMessage(from_); + break; + } + case 16: { + SequenceNr = input.ReadInt64(); + break; + } + case 24: { + CreationTime = input.ReadSInt64(); + break; + } + } + } + } + + } + + /// + ///* + /// Prior to version 1.4.19 + /// HeartbeatRsp + /// Sends an UniqueAddress + /// Version 1.4.19 can deserialize this message but does not send it + /// + internal sealed partial class HeartBeatResponse : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new HeartBeatResponse()); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[3]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public HeartBeatResponse() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public HeartBeatResponse(HeartBeatResponse other) : this() { + From = other.from_ != null ? other.From.Clone() : null; + sequenceNr_ = other.sequenceNr_; + creationTime_ = other.creationTime_; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public HeartBeatResponse Clone() { + return new HeartBeatResponse(this); + } + + /// Field number for the "from" field. + public const int FromFieldNumber = 1; + private global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress from_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress From { + get { return from_; } + set { + from_ = value; + } + } + + /// Field number for the "sequenceNr" field. + public const int SequenceNrFieldNumber = 2; + private long sequenceNr_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public long SequenceNr { + get { return sequenceNr_; } + set { + sequenceNr_ = value; + } + } + + /// Field number for the "creationTime" field. + public const int CreationTimeFieldNumber = 3; + private long creationTime_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public long CreationTime { + get { return creationTime_; } + set { + creationTime_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as HeartBeatResponse); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(HeartBeatResponse other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(From, other.From)) return false; + if (SequenceNr != other.SequenceNr) return false; + if (CreationTime != other.CreationTime) return false; + return true; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (from_ != null) hash ^= From.GetHashCode(); + if (SequenceNr != 0L) hash ^= SequenceNr.GetHashCode(); + if (CreationTime != 0L) hash ^= CreationTime.GetHashCode(); + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (from_ != null) { + output.WriteRawTag(10); + output.WriteMessage(From); + } + if (SequenceNr != 0L) { + output.WriteRawTag(16); + output.WriteInt64(SequenceNr); + } + if (CreationTime != 0L) { + output.WriteRawTag(24); + output.WriteInt64(CreationTime); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (from_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(From); + } + if (SequenceNr != 0L) { + size += 1 + pb::CodedOutputStream.ComputeInt64Size(SequenceNr); + } + if (CreationTime != 0L) { + size += 1 + pb::CodedOutputStream.ComputeInt64Size(CreationTime); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(HeartBeatResponse other) { + if (other == null) { + return; + } + if (other.from_ != null) { + if (from_ == null) { + from_ = new global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress(); + } + From.MergeFrom(other.From); + } + if (other.SequenceNr != 0L) { + SequenceNr = other.SequenceNr; + } + if (other.CreationTime != 0L) { + CreationTime = other.CreationTime; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + if (from_ == null) { + from_ = new global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress(); + } + input.ReadMessage(from_); + break; + } + case 16: { + SequenceNr = input.ReadInt64(); + break; + } + case 24: { + CreationTime = input.ReadInt64(); + break; + } + } + } + } + + } + /// /// Gossip Envelope /// @@ -443,7 +823,7 @@ internal sealed partial class GossipEnvelope : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[2]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[4]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -631,7 +1011,7 @@ internal sealed partial class GossipStatus : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[3]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[5]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -811,7 +1191,7 @@ internal sealed partial class Gossip : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[4]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[6]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -1071,7 +1451,7 @@ internal sealed partial class GossipOverview : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[5]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[7]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -1207,7 +1587,7 @@ internal sealed partial class ObserverReachability : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[8]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[10]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -1803,7 +2183,7 @@ internal sealed partial class VectorClock : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[9]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[11]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -2098,7 +2478,7 @@ internal sealed partial class UniqueAddress : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[10]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[12]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -2249,7 +2629,7 @@ internal sealed partial class ClusterRouterPool : pb::IMessage { [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public static pbr::MessageDescriptor Descriptor { - get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[12]; } + get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[14]; } } [global::System.Diagnostics.DebuggerNonUserCodeAttribute] @@ -2579,7 +2959,7 @@ internal sealed partial class ClusterRouterPoolSettings : pb::IMessage(); + for (var i = 1; i < args.Length - 1; ++i) + { + if (args[i].Equals("-Dmultinode") && args[i + 1].StartsWith(".")) + { + fixedArgs.Add(args[i] + args[i+1]); + ++i; + } + } + if(fixedArgs.Count == 0) + fixedArgs.AddRange(args); + + foreach (var arg in fixedArgs) { if (!arg.StartsWith("-D")) { diff --git a/src/protobuf/ClusterMessages.proto b/src/protobuf/ClusterMessages.proto index 1b17d667b7f..4206c0f3c21 100644 --- a/src/protobuf/ClusterMessages.proto +++ b/src/protobuf/ClusterMessages.proto @@ -59,14 +59,28 @@ message Welcome { ****************************************/ /** + * Prior to version 1.4.19 * Heartbeat * Sends an Address + * Version 1.4.19 can deserialize this message but does not send it */ + message Heartbeat { + Akka.Remote.Serialization.Proto.Msg.AddressData from = 1; + int64 sequenceNr = 2; + sint64 creationTime = 3; +} /** + * Prior to version 1.4.19 * HeartbeatRsp * Sends an UniqueAddress + * Version 1.4.19 can deserialize this message but does not send it */ + message HeartBeatResponse { + UniqueAddress from = 1; + int64 sequenceNr = 2; + int64 creationTime = 3; +} /**************************************** * Cluster Gossip Messages From e2fc9381f4756df4f9e9baefd1717e4401ac8f4c Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 12 Apr 2021 19:41:33 -0500 Subject: [PATCH 2/3] added updated API Spec --- .../Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt index c3f9f370ec0..f6bf19cd571 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt @@ -437,11 +437,12 @@ namespace Akka.Cluster.SBR } namespace Akka.Cluster.Serialization { - public class ClusterMessageSerializer : Akka.Serialization.Serializer + [Akka.Annotations.InternalApiAttribute()] + public class ClusterMessageSerializer : Akka.Serialization.SerializerWithStringManifest { public ClusterMessageSerializer(Akka.Actor.ExtendedActorSystem system) { } - public override bool IncludeManifest { get; } - public override object FromBinary(byte[] bytes, System.Type type) { } + public override object FromBinary(byte[] bytes, string manifest) { } + public override string Manifest(object o) { } public override byte[] ToBinary(object obj) { } } } \ No newline at end of file From 026d922cdbd6c9fc54bf894cfa5a5f2ced3bbc05 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 12 Apr 2021 20:27:05 -0500 Subject: [PATCH 3/3] increased ClusterLogSpec join timespan Increased the `TimeSpan` here to 10 seconds in order to prevent this spec from failing racily, since even an Akka.Cluster self-join can take more than the default 3 seconds due to some of the timings involved in node startup et al. --- src/core/Akka.Cluster.Tests/ClusterLogSpec.cs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs b/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs index 5b3926886dd..1653a45f5b8 100644 --- a/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using System; using System.Linq; using Akka.Actor; using Akka.Configuration; @@ -56,9 +57,12 @@ protected void AwaitUp() /// protected void Join(string expected) { - EventFilter - .Info(contains: expected) - .ExpectOne(() => _cluster.Join(_selfAddress)); + Within(TimeSpan.FromSeconds(10), () => + { + EventFilter + .Info(contains: expected) + .ExpectOne(() => _cluster.Join(_selfAddress)); + }); } ///