diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt
index c3f9f370ec0..f6bf19cd571 100644
--- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt
+++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt
@@ -437,11 +437,12 @@ namespace Akka.Cluster.SBR
}
namespace Akka.Cluster.Serialization
{
- public class ClusterMessageSerializer : Akka.Serialization.Serializer
+ [Akka.Annotations.InternalApiAttribute()]
+ public class ClusterMessageSerializer : Akka.Serialization.SerializerWithStringManifest
{
public ClusterMessageSerializer(Akka.Actor.ExtendedActorSystem system) { }
- public override bool IncludeManifest { get; }
- public override object FromBinary(byte[] bytes, System.Type type) { }
+ public override object FromBinary(byte[] bytes, string manifest) { }
+ public override string Manifest(object o) { }
public override byte[] ToBinary(object obj) { }
}
}
\ No newline at end of file
diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
new file mode 100644
index 00000000000..90dd2c6d90c
--- /dev/null
+++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
@@ -0,0 +1,35 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2021 Lightbend Inc.
+// Copyright (C) 2013-2021 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+using Akka.Actor;
+using Akka.Configuration;
+using Akka.TestKit;
+using Xunit;
+using Xunit.Abstractions;
+using static Akka.Cluster.ClusterHeartbeatSender;
+
+namespace Akka.Cluster.Tests
+{
+ public class ClusterHeartbeatReceiverSpec : AkkaSpec
+ {
+ public static Config Config = @"akka.actor.provider = cluster";
+
+ public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
+ : base(Config, output)
+ {
+
+ }
+
+ [Fact]
+ public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime()
+ {
+ var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys)));
+ heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2));
+ ExpectMsg(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2));
+ }
+ }
+}
diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
new file mode 100644
index 00000000000..fdea539c91b
--- /dev/null
+++ b/src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
@@ -0,0 +1,66 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2021 Lightbend Inc.
+// Copyright (C) 2013-2021 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+using System.Collections.Immutable;
+using Akka.Actor;
+using Akka.Configuration;
+using Akka.TestKit;
+using Akka.Util;
+using FluentAssertions;
+using Xunit;
+using Xunit.Abstractions;
+using static Akka.Cluster.ClusterHeartbeatSender;
+
+namespace Akka.Cluster.Tests
+{
+ public class ClusterHeartbeatSenderSpec : AkkaSpec
+ {
+ class TestClusterHeartbeatSender : ClusterHeartbeatSender
+ {
+ private readonly TestProbe _probe;
+
+ public TestClusterHeartbeatSender(TestProbe probe)
+ {
+ _probe = probe;
+ }
+
+ protected override void PreStart()
+ {
+ // don't register for cluster events
+ }
+
+ protected override ActorSelection HeartbeatReceiver(Address address)
+ {
+ return Context.ActorSelection(_probe.Ref.Path);
+ }
+ }
+
+ public static readonly Config Config = @"
+ akka.loglevel = DEBUG
+ akka.actor.provider = cluster
+ akka.cluster.failure-detector.heartbeat-interval = 0.2s
+ ";
+
+ public ClusterHeartbeatSenderSpec(ITestOutputHelper output)
+ : base(Config, output){ }
+
+ [Fact]
+ public void ClusterHeartBeatSender_must_increment_heartbeat_SeqNo()
+ {
+ var probe = CreateTestProbe();
+ var underTest = Sys.ActorOf(Props.Create(() => new TestClusterHeartbeatSender(probe)));
+
+ underTest.Tell(new ClusterEvent.CurrentClusterState());
+ underTest.Tell(new ClusterEvent.MemberUp(new Member(
+ new UniqueAddress(new Address("akka", Sys.Name), 1), 1,
+ MemberStatus.Up, ImmutableHashSet.Empty, AppVersion.Zero)));
+
+ probe.ExpectMsg().SequenceNr.Should().Be(1L);
+ probe.ExpectMsg().SequenceNr.Should().Be(2L);
+ }
+ }
+}
diff --git a/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs b/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
index 5b3926886dd..1653a45f5b8 100644
--- a/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
+++ b/src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
@@ -5,6 +5,7 @@
//
//-----------------------------------------------------------------------
+using System;
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
@@ -56,9 +57,12 @@ protected void AwaitUp()
///
protected void Join(string expected)
{
- EventFilter
- .Info(contains: expected)
- .ExpectOne(() => _cluster.Join(_selfAddress));
+ Within(TimeSpan.FromSeconds(10), () =>
+ {
+ EventFilter
+ .Info(contains: expected)
+ .ExpectOne(() => _cluster.Join(_selfAddress));
+ });
}
///
diff --git a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs
index 2781b51ffb4..19dc355ff8d 100644
--- a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs
+++ b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs
@@ -8,11 +8,14 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Routing;
+using Akka.Cluster.Serialization;
using Akka.Routing;
+using Akka.Serialization;
using Akka.TestKit;
using Xunit;
using FluentAssertions;
using Akka.Util;
+using Google.Protobuf;
namespace Akka.Cluster.Tests.Serialization
{
@@ -33,19 +36,47 @@ public ClusterMessageSerializerSpec()
public void Can_serialize_Heartbeat()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
- var message = new ClusterHeartbeatSender.Heartbeat(address);
+ var message = new ClusterHeartbeatSender.Heartbeat(address, -1, -1);
AssertEqual(message);
}
+ [Fact]
+ public void Can_serialize_Hearbeatv1419_later()
+ {
+ var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat()
+ {
+ From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address),
+ CreationTime = 2,
+ SequenceNr = 1
+ }.ToByteArray();
+
+ var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
+ serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest);
+ }
+
[Fact]
public void Can_serialize_HeartbeatRsp()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var uniqueAddress = new UniqueAddress(address, 17);
- var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress);
+ var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
AssertEqual(message);
}
+ [Fact]
+ public void Can_serialize_HearbeatRspv1419_later()
+ {
+ var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse()
+ {
+ From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress),
+ CreationTime = 2,
+ SequenceNr = 1
+ }.ToByteArray();
+
+ var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
+ serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest);
+ }
+
[Fact]
public void Can_serialize_GossipEnvelope()
{
@@ -191,6 +222,7 @@ private T AssertAndReturn(T message)
{
var serializer = Sys.Serialization.FindSerializerFor(message);
var serialized = serializer.ToBinary(message);
+ serializer.Should().BeOfType();
return serializer.FromBinary(serialized);
}
diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs
index 52de7e074b5..5bef4d02955 100644
--- a/src/core/Akka.Cluster/Cluster.cs
+++ b/src/core/Akka.Cluster/Cluster.cs
@@ -538,10 +538,7 @@ internal void Shutdown()
LogInfo("Shutting down...");
System.Stop(_clusterDaemons);
- if (_readView != null)
- {
- _readView.Dispose();
- }
+ _readView?.Dispose();
LogInfo("Successfully shut down");
}
@@ -583,6 +580,27 @@ public InfoLogger(ILoggingAdapter log, ClusterSettings settings, Address selfAdd
_selfAddress = selfAddress;
}
+ ///
+ /// Creates an log entry with the specific message.
+ ///
+ /// The message being logged.
+ internal void LogDebug(string message)
+ {
+ if (_log.IsDebugEnabled)
+ _log.Debug("Cluster Node [{0}] - {1}", _selfAddress, message);
+ }
+
+ ///
+ /// Creates an log entry with the specific template and arguments.
+ ///
+ /// The template being rendered and logged.
+ /// The argument that fills in the template placeholder.
+ internal void LogDebug(string template, object arg1)
+ {
+ if (_log.IsDebugEnabled)
+ _log.Debug("Cluster Node [{1}] - " + template, arg1, _selfAddress);
+ }
+
///
/// Creates an log entry with the specific message.
///
diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs
index e3f0e085294..c8a1d3e9333 100644
--- a/src/core/Akka.Cluster/ClusterDaemon.cs
+++ b/src/core/Akka.Cluster/ClusterDaemon.cs
@@ -898,7 +898,7 @@ private void CreateChildren()
{
_coreSupervisor = Context.ActorOf(Props.Create(), "core");
- Context.ActorOf(Props.Create(), "heartbeatReceiver");
+ Context.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Context.System)), "heartbeatReceiver");
}
protected override void PostStop()
diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs
index f4a8ef607f0..0ecf786f4a7 100644
--- a/src/core/Akka.Cluster/ClusterHeartbeat.cs
+++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs
@@ -12,6 +12,7 @@
using Akka.Actor;
using Akka.Event;
using Akka.Remote;
+using Akka.Util;
using Akka.Util.Internal;
namespace Akka.Cluster
@@ -21,33 +22,53 @@ namespace Akka.Cluster
///
/// Receives messages and replies.
///
- internal sealed class ClusterHeartbeatReceiver : ReceiveActor
+ internal sealed class ClusterHeartbeatReceiver : UntypedActor
{
- private readonly Lazy _selfHeartbeatRsp;
+ // Important - don't use Cluster.Get(Context.System) in constructor because that would
+ // cause deadlock. See startup sequence in ClusterDaemon.
+ private readonly Lazy _cluster;
+
+ public bool VerboseHeartbeat => _cluster.Value.Settings.VerboseHeartbeatLogging;
///
/// TBD
///
- public ClusterHeartbeatReceiver()
+ public ClusterHeartbeatReceiver(Func getCluster)
{
- // Important - don't use Cluster.Get(Context.System) in constructor because that would
- // cause deadlock. See startup sequence in ClusterDaemon.
- _selfHeartbeatRsp = new Lazy(() =>
- new ClusterHeartbeatSender.HeartbeatRsp(Cluster.Get(Context.System).SelfUniqueAddress));
+ _cluster = new Lazy(getCluster);
+ }
- Receive(heartbeat => Sender.Tell(_selfHeartbeatRsp.Value));
+ protected override void OnReceive(object message)
+ {
+ switch (message)
+ {
+ case ClusterHeartbeatSender.Heartbeat hb:
+ // TODO log the sequence nr once serializer is enabled
+ if(VerboseHeartbeat) _cluster.Value.CurrentInfoLogger.LogDebug("Heartbeat from [{0}]", hb.From);
+ Sender.Tell(new ClusterHeartbeatSender.HeartbeatRsp(_cluster.Value.SelfUniqueAddress,
+ hb.SequenceNr, hb.CreationTimeNanos));
+ break;
+ default:
+ Unhandled(message);
+ break;
+ }
}
+
+ public static Props Props(Func getCluster)
+ {
+ return Akka.Actor.Props.Create(() => new ClusterHeartbeatReceiver(getCluster));
+ }
+
}
///
/// INTERNAL API
///
- internal sealed class ClusterHeartbeatSender : ReceiveActor
+ internal class ClusterHeartbeatSender : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Cluster _cluster;
private readonly IFailureDetectorRegistry _failureDetector;
- private readonly Heartbeat _selfHeartbeat;
private ClusterHeartbeatSenderState _state;
private readonly ICancelable _heartbeatTask;
@@ -66,8 +87,6 @@ public ClusterHeartbeatSender()
// the failureDetector is only updated by this actor, but read from other places
_failureDetector = _cluster.FailureDetector;
- _selfHeartbeat = new Heartbeat(_cluster.SelfAddress);
-
_state = new ClusterHeartbeatSenderState(
ring: new HeartbeatNodeRing(
_cluster.SelfUniqueAddress,
@@ -88,6 +107,13 @@ public ClusterHeartbeatSender()
Initializing();
}
+ private long _seqNo;
+ private Heartbeat SelfHeartbeat()
+ {
+ _seqNo += 1;
+ return new Heartbeat(_cluster.SelfAddress, _seqNo, MonotonicClock.GetNanos());
+ }
+
///
/// TBD
///
@@ -112,7 +138,7 @@ protected override void PostStop()
///
/// Looks up and returns the remote cluster heartbeat connection for the specific address.
///
- private ActorSelection HeartbeatReceiver(Address address)
+ protected virtual ActorSelection HeartbeatReceiver(Address address)
{
return Context.ActorSelection(new RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver");
}
@@ -133,7 +159,7 @@ private void Initializing()
private void Active()
{
Receive(tick => DoHeartbeat());
- Receive(rsp => DoHeartbeatRsp(rsp.From));
+ Receive(rsp => DoHeartbeatRsp(rsp));
Receive(removed => RemoveMember(removed.Member));
Receive(evt => AddMember(evt.Member));
Receive(m => UnreachableMember(m.Member));
@@ -204,7 +230,7 @@ private void DoHeartbeat()
new ExpectedFirstHeartbeat(to),
Self);
}
- HeartbeatReceiver(to.Address).Tell(_selfHeartbeat);
+ HeartbeatReceiver(to.Address).Tell(SelfHeartbeat());
}
CheckTickInterval();
@@ -227,13 +253,14 @@ private void CheckTickInterval()
_tickTimestamp = DateTime.UtcNow;
}
- private void DoHeartbeatRsp(UniqueAddress from)
+ private void DoHeartbeatRsp(HeartbeatRsp rsp)
{
if (_cluster.Settings.VerboseHeartbeatLogging)
{
- _log.Debug("Cluster Node [{0}] - Heartbeat response from [{1}]", _cluster.SelfAddress, from.Address);
+ // TODO: log response time and validate sequence nrs once serialisation of sendTime is released
+ _log.Debug("Cluster Node [{0}] - Heartbeat response from [{1}]", _cluster.SelfAddress, rsp.From.Address);
}
- _state = _state.HeartbeatRsp(from);
+ _state = _state.HeartbeatRsp(rsp.From);
}
private void TriggerFirstHeart(UniqueAddress from)
@@ -253,70 +280,85 @@ private void TriggerFirstHeart(UniqueAddress from)
///
/// Sent at regular intervals for failure detection
///
- internal sealed class Heartbeat : IClusterMessage, IPriorityMessage, IDeadLetterSuppression
+ internal sealed class Heartbeat : IClusterMessage, IPriorityMessage, IDeadLetterSuppression, IEquatable
{
- ///
- /// TBD
- ///
- /// TBD
- public Heartbeat(Address from)
+ public Heartbeat(Address from, long sequenceNr, long creationTimeNanos)
{
From = from;
+ SequenceNr = sequenceNr;
+ CreationTimeNanos = creationTimeNanos;
}
- ///
- /// TBD
- ///
public Address From { get; }
-#pragma warning disable 659 //there might very well be multiple heartbeats from the same address. overriding GetHashCode may have uninteded side effects
- ///
+ public long SequenceNr { get; }
+
+ public long CreationTimeNanos { get; }
+
+ public bool Equals(Heartbeat other)
+ {
+ if (ReferenceEquals(null, other)) return false;
+ if (ReferenceEquals(this, other)) return true;
+ return From.Equals(other.From) && SequenceNr == other.SequenceNr && CreationTimeNanos == other.CreationTimeNanos;
+ }
+
public override bool Equals(object obj)
-#pragma warning restore 659
{
- if (ReferenceEquals(null, obj)) return false;
- if (ReferenceEquals(this, obj)) return true;
- return obj is Heartbeat && Equals((Heartbeat)obj);
+ return ReferenceEquals(this, obj) || obj is Heartbeat other && Equals(other);
}
- private bool Equals(Heartbeat other)
+ public override int GetHashCode()
{
- return Equals(From, other.From);
+ unchecked
+ {
+ var hashCode = From.GetHashCode();
+ hashCode = (hashCode * 397) ^ SequenceNr.GetHashCode();
+ hashCode = (hashCode * 397) ^ CreationTimeNanos.GetHashCode();
+ return hashCode;
+ }
}
}
///
/// Sends replies to messages
///
- internal sealed class HeartbeatRsp : IClusterMessage, IPriorityMessage, IDeadLetterSuppression
+ internal sealed class HeartbeatRsp : IClusterMessage, IPriorityMessage, IDeadLetterSuppression, IEquatable
{
- ///
- /// TBD
- ///
- /// TBD
- public HeartbeatRsp(UniqueAddress from)
+ public HeartbeatRsp(UniqueAddress from, long sequenceNr, long creationTimeNanos)
{
From = from;
+ SequenceNr = sequenceNr;
+ CreationTimeNanos = creationTimeNanos;
}
- ///
- /// TBD
- ///
public UniqueAddress From { get; }
-#pragma warning disable 659 //there might very well be multiple heartbeats from the same address. overriding GetHashCode may have uninteded side effects
- ///
+ public long SequenceNr { get; }
+
+ public long CreationTimeNanos { get; }
+
+ public bool Equals(HeartbeatRsp other)
+ {
+ if (ReferenceEquals(null, other)) return false;
+ if (ReferenceEquals(this, other)) return true;
+ return From.Equals(other.From) && SequenceNr == other.SequenceNr
+ && CreationTimeNanos == other.CreationTimeNanos;
+ }
+
public override bool Equals(object obj)
-#pragma warning restore 659
{
- if (ReferenceEquals(null, obj)) return false;
- if (ReferenceEquals(this, obj)) return true;
- return obj is HeartbeatRsp && Equals((HeartbeatRsp)obj);
+ return ReferenceEquals(this, obj) || obj is HeartbeatRsp other && Equals(other);
}
- private bool Equals(HeartbeatRsp other)
+ public override int GetHashCode()
{
- return Equals(From, other.From);
+ unchecked
+ {
+ var hashCode = From.GetHashCode();
+ hashCode = (hashCode * 397) ^ SequenceNr.GetHashCode();
+ hashCode = (hashCode * 397) ^ CreationTimeNanos.GetHashCode();
+ return hashCode;
+ }
}
}
diff --git a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs
index dae1b3a199e..f3d7e443262 100644
--- a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs
+++ b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs
@@ -12,41 +12,55 @@
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using Akka.Actor;
-using Akka.Cluster.Routing;
+using Akka.Annotations;
+using Akka.Cluster.Serialization.Proto.Msg;
using Akka.Serialization;
using Akka.Util;
using Akka.Util.Internal;
using Google.Protobuf;
using AddressData = Akka.Remote.Serialization.Proto.Msg.AddressData;
+using ClusterRouterPool = Akka.Cluster.Routing.ClusterRouterPool;
+using ClusterRouterPoolSettings = Akka.Cluster.Routing.ClusterRouterPoolSettings;
namespace Akka.Cluster.Serialization
{
- public class ClusterMessageSerializer : Serializer
+ [InternalApi]
+ public class ClusterMessageSerializer : SerializerWithStringManifest
{
- private readonly Dictionary> _fromBinaryMap;
+ /*
+ * BUG: should have been a SerializerWithStringManifest this entire time
+ * Since it wasn't need to include full type names for backwards compatibility
+ */
+ internal const string JoinManifest = "Akka.Cluster.InternalClusterAction+Join, Akka.Cluster";
+ internal const string WelcomeManifest = "Akka.Cluster.InternalClusterAction+Welcome, Akka.Cluster";
+ internal const string LeaveManifest = "Akka.Cluster.ClusterUserAction+Leave, Akka.Cluster";
+ internal const string DownManifest = "Akka.Cluster.ClusterUserAction+Down, Akka.Cluster";
+
+ internal const string InitJoinManifest = "Akka.Cluster.InternalClusterAction+InitJoin, Akka.Cluster";
+
+ internal const string InitJoinAckManifest = "Akka.Cluster.InternalClusterAction+InitJoinAck, Akka.Cluster";
+
+ internal const string InitJoinNackManifest = "Akka.Cluster.InternalClusterAction+InitJoinNack, Akka.Cluster";
+
+ // TODO: remove in a future version of Akka.NET (2.0)
+ internal const string HeartBeatManifestPre1419 = "Akka.Cluster.ClusterHeartbeatSender+Heartbeat, Akka.Cluster";
+ internal const string HeartBeatRspManifestPre1419 = "Akka.Cluster.ClusterHeartbeatSender+HeartbeatRsp, Akka.Cluster";
+
+ internal const string HeartBeatManifest = "HB";
+ internal const string HeartBeatRspManifest = "HBR";
+
+ internal const string ExitingConfirmedManifest = "Akka.Cluster.InternalClusterAction+ExitingConfirmed, Akka.Cluster";
+
+ internal const string GossipStatusManifest = "Akka.Cluster.GossipStatus, Akka.Cluster";
+ internal const string GossipEnvelopeManifest = "Akka.Cluster.GossipEnvelope, Akka.Cluster";
+ internal const string ClusterRouterPoolManifest = "Akka.Cluster.Routing.ClusterRouterPool, Akka.Cluster";
+
public ClusterMessageSerializer(ExtendedActorSystem system) : base(system)
{
- _fromBinaryMap = new Dictionary>
- {
- [typeof(ClusterHeartbeatSender.Heartbeat)] = bytes => new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes))),
- [typeof(ClusterHeartbeatSender.HeartbeatRsp)] = bytes => new ClusterHeartbeatSender.HeartbeatRsp(UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes))),
- [typeof(GossipEnvelope)] = GossipEnvelopeFrom,
- [typeof(GossipStatus)] = GossipStatusFrom,
- [typeof(InternalClusterAction.Join)] = JoinFrom,
- [typeof(InternalClusterAction.Welcome)] = WelcomeFrom,
- [typeof(ClusterUserAction.Leave)] = bytes => new ClusterUserAction.Leave(AddressFrom(AddressData.Parser.ParseFrom(bytes))),
- [typeof(ClusterUserAction.Down)] = bytes => new ClusterUserAction.Down(AddressFrom(AddressData.Parser.ParseFrom(bytes))),
- [typeof(InternalClusterAction.InitJoin)] = bytes => new InternalClusterAction.InitJoin(),
- [typeof(InternalClusterAction.InitJoinAck)] = bytes => new InternalClusterAction.InitJoinAck(AddressFrom(AddressData.Parser.ParseFrom(bytes))),
- [typeof(InternalClusterAction.InitJoinNack)] = bytes => new InternalClusterAction.InitJoinNack(AddressFrom(AddressData.Parser.ParseFrom(bytes))),
- [typeof(InternalClusterAction.ExitingConfirmed)] = bytes => new InternalClusterAction.ExitingConfirmed(UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes))),
- [typeof(ClusterRouterPool)] = ClusterRouterPoolFrom
- };
- }
-
- public override bool IncludeManifest => true;
+
+ }
public override byte[] ToBinary(object obj)
{
@@ -83,12 +97,79 @@ public override byte[] ToBinary(object obj)
}
}
- public override object FromBinary(byte[] bytes, Type type)
+ public override object FromBinary(byte[] bytes, string manifest)
{
- if (_fromBinaryMap.TryGetValue(type, out var factory))
- return factory(bytes);
+ switch (manifest)
+ {
+ case HeartBeatManifestPre1419:
+ return DeserializeHeartbeatAsAddress(bytes);
+ case HeartBeatRspManifestPre1419:
+ return DeserializeHeartbeatRspAsUniqueAddress(bytes);
+ case HeartBeatManifest:
+ return DeserializeHeartbeat(bytes);
+ case HeartBeatRspManifest:
+ return DeserializeHeartbeatRsp(bytes);
+ case GossipStatusManifest:
+ return GossipStatusFrom(bytes);
+ case GossipEnvelopeManifest:
+ return GossipEnvelopeFrom(bytes);
+ case InitJoinManifest:
+ return new InternalClusterAction.InitJoin();
+ case InitJoinAckManifest:
+ return new InternalClusterAction.InitJoinAck(AddressFrom(AddressData.Parser.ParseFrom(bytes)));
+ case InitJoinNackManifest:
+ return new InternalClusterAction.InitJoinNack(AddressFrom(AddressData.Parser.ParseFrom(bytes)));
+ case JoinManifest:
+ return JoinFrom(bytes);
+ case WelcomeManifest:
+ return WelcomeFrom(bytes);
+ case LeaveManifest:
+ return new ClusterUserAction.Leave(AddressFrom(AddressData.Parser.ParseFrom(bytes)));
+ case DownManifest:
+ return new ClusterUserAction.Down(AddressFrom(AddressData.Parser.ParseFrom(bytes)));
+ case ExitingConfirmedManifest:
+ return new InternalClusterAction.ExitingConfirmed(
+ UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes)));
+ case ClusterRouterPoolManifest:
+ return ClusterRouterPoolFrom(bytes);
+ default:
+ throw new ArgumentException($"Unknown manifest [{manifest}] in [{nameof(ClusterMessageSerializer)}]");
+ }
+ }
- throw new SerializationException($"{nameof(ClusterMessageSerializer)} cannot deserialize object of type {type}");
+ public override string Manifest(object o)
+ {
+ switch (o)
+ {
+ case InternalClusterAction.Join _:
+ return JoinManifest;
+ case InternalClusterAction.Welcome _:
+ return WelcomeManifest;
+ case ClusterUserAction.Leave _:
+ return LeaveManifest;
+ case ClusterUserAction.Down _:
+ return DownManifest;
+ case InternalClusterAction.InitJoin _:
+ return InitJoinManifest;
+ case InternalClusterAction.InitJoinAck _:
+ return InitJoinAckManifest;
+ case InternalClusterAction.InitJoinNack _:
+ return InitJoinNackManifest;
+ case ClusterHeartbeatSender.Heartbeat _:
+ return HeartBeatManifestPre1419;
+ case ClusterHeartbeatSender.HeartbeatRsp _:
+ return HeartBeatRspManifestPre1419;
+ case InternalClusterAction.ExitingConfirmed _:
+ return ExitingConfirmedManifest;
+ case GossipStatus _:
+ return GossipStatusManifest;
+ case GossipEnvelope _:
+ return GossipEnvelopeManifest;
+ case ClusterRouterPool _:
+ return ClusterRouterPoolManifest;
+ default:
+ throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{GetType()}]");
+ }
}
//
@@ -110,6 +191,7 @@ private static InternalClusterAction.Join JoinFrom(byte[] bytes)
return new InternalClusterAction.Join(UniqueAddressFrom(join.Node), join.Roles.ToImmutableHashSet(), ver);
}
+ // TODO: need to gzip compress the Welcome message for large clusters
private static byte[] WelcomeMessageBuilder(InternalClusterAction.Welcome welcome)
{
var welcomeProto = new Proto.Msg.Welcome();
@@ -366,12 +448,38 @@ private static int MapWithErrorMessage(Dictionary map, T value, strin
throw new ArgumentException($"Unknown {unknown} [{value}] in cluster message");
}
+ //
+ // Heartbeat
+ //
+ private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRspAsUniqueAddress(byte[] bytes)
+ {
+ var uniqueAddress = UniqueAddressFrom(Proto.Msg.UniqueAddress.Parser.ParseFrom(bytes));
+ return new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
+ }
+
+ private static ClusterHeartbeatSender.HeartbeatRsp DeserializeHeartbeatRsp(byte[] bytes)
+ {
+ var hbsp = HeartBeatResponse.Parser.ParseFrom(bytes);
+ return new ClusterHeartbeatSender.HeartbeatRsp(UniqueAddressFrom(hbsp.From), hbsp.SequenceNr, hbsp.CreationTime);
+ }
+
+ private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeatAsAddress(byte[] bytes)
+ {
+ return new ClusterHeartbeatSender.Heartbeat(AddressFrom(AddressData.Parser.ParseFrom(bytes)), -1, -1);
+ }
+
+ private static ClusterHeartbeatSender.Heartbeat DeserializeHeartbeat(byte[] bytes)
+ {
+ var hb = Heartbeat.Parser.ParseFrom(bytes);
+ return new ClusterHeartbeatSender.Heartbeat(AddressFrom(hb.From), hb.SequenceNr, hb.CreationTime);
+ }
+
//
// Address
//
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static AddressData AddressToProto(Address address)
+ internal static AddressData AddressToProto(Address address)
{
var message = new AddressData();
message.System = address.System;
@@ -382,7 +490,7 @@ private static AddressData AddressToProto(Address address)
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static Address AddressFrom(AddressData addressProto)
+ internal static Address AddressFrom(AddressData addressProto)
{
return new Address(
addressProto.Protocol,
@@ -392,7 +500,7 @@ private static Address AddressFrom(AddressData addressProto)
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static Proto.Msg.UniqueAddress UniqueAddressToProto(UniqueAddress uniqueAddress)
+ internal static Proto.Msg.UniqueAddress UniqueAddressToProto(UniqueAddress uniqueAddress)
{
var message = new Proto.Msg.UniqueAddress();
message.Address = AddressToProto(uniqueAddress.Address);
@@ -401,7 +509,7 @@ private static Proto.Msg.UniqueAddress UniqueAddressToProto(UniqueAddress unique
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private static UniqueAddress UniqueAddressFrom(Proto.Msg.UniqueAddress uniqueAddressProto)
+ internal static UniqueAddress UniqueAddressFrom(Proto.Msg.UniqueAddress uniqueAddressProto)
{
return new UniqueAddress(AddressFrom(uniqueAddressProto.Address), (int)uniqueAddressProto.Uid);
}
@@ -409,8 +517,7 @@ private static UniqueAddress UniqueAddressFrom(Proto.Msg.UniqueAddress uniqueAdd
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static string GetObjectManifest(Serializer serializer, object obj)
{
- var manifestSerializer = serializer as SerializerWithStringManifest;
- if (manifestSerializer != null)
+ if (serializer is SerializerWithStringManifest manifestSerializer)
{
return manifestSerializer.Manifest(obj);
}
diff --git a/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs b/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs
index fb96c5a3959..bf5059bfda8 100644
--- a/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs
+++ b/src/core/Akka.Cluster/Serialization/Proto/ClusterMessages.g.cs
@@ -29,58 +29,66 @@ static ClusterMessagesReflection() {
"cHBWZXJzaW9uGAMgASgJIooBCgdXZWxjb21lEkEKBGZyb20YASABKAsyMy5B",
"a2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRk",
"cmVzcxI8CgZnb3NzaXAYAiABKAsyLC5Ba2thLkNsdXN0ZXIuU2VyaWFsaXph",
- "dGlvbi5Qcm90by5Nc2cuR29zc2lwIq4BCg5Hb3NzaXBFbnZlbG9wZRJBCgRm",
- "cm9tGAEgASgLMjMuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8u",
- "TXNnLlVuaXF1ZUFkZHJlc3MSPwoCdG8YAiABKAsyMy5Ba2thLkNsdXN0ZXIu",
- "U2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRkcmVzcxIYChBzZXJp",
- "YWxpemVkR29zc2lwGAMgASgMIqgBCgxHb3NzaXBTdGF0dXMSQQoEZnJvbRgB",
- "IAEoCzIzLkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1zZy5V",
- "bmlxdWVBZGRyZXNzEhEKCWFsbEhhc2hlcxgCIAMoCRJCCgd2ZXJzaW9uGAMg",
- "ASgLMjEuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlZl",
- "Y3RvckNsb2NrItsCCgZHb3NzaXASSQoMYWxsQWRkcmVzc2VzGAEgAygLMjMu",
- "QWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlVuaXF1ZUFk",
- "ZHJlc3MSEAoIYWxsUm9sZXMYAiADKAkSEQoJYWxsSGFzaGVzGAMgAygJEj0K",
- "B21lbWJlcnMYBCADKAsyLC5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Q",
- "cm90by5Nc2cuTWVtYmVyEkYKCG92ZXJ2aWV3GAUgASgLMjQuQWtrYS5DbHVz",
- "dGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLkdvc3NpcE92ZXJ2aWV3EkIK",
- "B3ZlcnNpb24YBiABKAsyMS5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Q",
- "cm90by5Nc2cuVmVjdG9yQ2xvY2sSFgoOYWxsQXBwVmVyc2lvbnMYCCADKAki",
- "eAoOR29zc2lwT3ZlcnZpZXcSDAoEc2VlbhgBIAMoBRJYChRvYnNlcnZlclJl",
- "YWNoYWJpbGl0eRgCIAMoCzI6LkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9u",
- "LlByb3RvLk1zZy5PYnNlcnZlclJlYWNoYWJpbGl0eSKVAQoUT2JzZXJ2ZXJS",
- "ZWFjaGFiaWxpdHkSFAoMYWRkcmVzc0luZGV4GAEgASgFEg8KB3ZlcnNpb24Y",
- "BCABKAMSVgoTc3ViamVjdFJlYWNoYWJpbGl0eRgCIAMoCzI5LkFra2EuQ2x1",
- "c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1zZy5TdWJqZWN0UmVhY2hhYmls",
- "aXR5IuABChNTdWJqZWN0UmVhY2hhYmlsaXR5EhQKDGFkZHJlc3NJbmRleBgB",
- "IAEoBRJcCgZzdGF0dXMYAyABKA4yTC5Ba2thLkNsdXN0ZXIuU2VyaWFsaXph",
- "dGlvbi5Qcm90by5Nc2cuU3ViamVjdFJlYWNoYWJpbGl0eS5SZWFjaGFiaWxp",
- "dHlTdGF0dXMSDwoHdmVyc2lvbhgEIAEoAyJEChJSZWFjaGFiaWxpdHlTdGF0",
- "dXMSDQoJUmVhY2hhYmxlEAASDwoLVW5yZWFjaGFibGUQARIOCgpUZXJtaW5h",
- "dGVkEAIikgIKBk1lbWJlchIUCgxhZGRyZXNzSW5kZXgYASABKAUSEAoIdXBO",
- "dW1iZXIYAiABKAUSSQoGc3RhdHVzGAMgASgOMjkuQWtrYS5DbHVzdGVyLlNl",
- "cmlhbGl6YXRpb24uUHJvdG8uTXNnLk1lbWJlci5NZW1iZXJTdGF0dXMSGAoM",
- "cm9sZXNJbmRleGVzGAQgAygFQgIQARIXCg9hcHBWZXJzaW9uSW5kZXgYBSAB",
- "KAUiYgoMTWVtYmVyU3RhdHVzEgsKB0pvaW5pbmcQABIGCgJVcBABEgsKB0xl",
- "YXZpbmcQAhILCgdFeGl0aW5nEAMSCAoERG93bhAEEgsKB1JlbW92ZWQQBRIM",
- "CghXZWFrbHlVcBAGIp4BCgtWZWN0b3JDbG9jaxIRCgl0aW1lc3RhbXAYASAB",
- "KAMSSwoIdmVyc2lvbnMYAiADKAsyOS5Ba2thLkNsdXN0ZXIuU2VyaWFsaXph",
- "dGlvbi5Qcm90by5Nc2cuVmVjdG9yQ2xvY2suVmVyc2lvbhovCgdWZXJzaW9u",
- "EhEKCWhhc2hJbmRleBgBIAEoBRIRCgl0aW1lc3RhbXAYAiABKAMiXwoNVW5p",
- "cXVlQWRkcmVzcxJBCgdhZGRyZXNzGAEgASgLMjAuQWtrYS5SZW1vdGUuU2Vy",
- "aWFsaXphdGlvbi5Qcm90by5Nc2cuQWRkcmVzc0RhdGESCwoDdWlkGAIgASgN",
- "IqABChFDbHVzdGVyUm91dGVyUG9vbBI4CgRwb29sGAEgASgLMiouQWtrYS5D",
- "bHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlBvb2wSUQoIc2V0dGlu",
- "Z3MYAiABKAsyPy5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5N",
- "c2cuQ2x1c3RlclJvdXRlclBvb2xTZXR0aW5ncyI8CgRQb29sEhQKDHNlcmlh",
- "bGl6ZXJJZBgBIAEoDRIQCghtYW5pZmVzdBgCIAEoCRIMCgRkYXRhGAMgASgM",
- "InwKGUNsdXN0ZXJSb3V0ZXJQb29sU2V0dGluZ3MSFgoOdG90YWxJbnN0YW5j",
- "ZXMYASABKA0SGwoTbWF4SW5zdGFuY2VzUGVyTm9kZRgCIAEoDRIZChFhbGxv",
- "d0xvY2FsUm91dGVlcxgDIAEoCBIPCgd1c2VSb2xlGAQgASgJYgZwcm90bzM="));
+ "dGlvbi5Qcm90by5Nc2cuR29zc2lwInUKCUhlYXJ0YmVhdBI+CgRmcm9tGAEg",
+ "ASgLMjAuQWtrYS5SZW1vdGUuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuQWRk",
+ "cmVzc0RhdGESEgoKc2VxdWVuY2VOchgCIAEoAxIUCgxjcmVhdGlvblRpbWUY",
+ "AyABKBIigAEKEUhlYXJ0QmVhdFJlc3BvbnNlEkEKBGZyb20YASABKAsyMy5B",
+ "a2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRk",
+ "cmVzcxISCgpzZXF1ZW5jZU5yGAIgASgDEhQKDGNyZWF0aW9uVGltZRgDIAEo",
+ "AyKuAQoOR29zc2lwRW52ZWxvcGUSQQoEZnJvbRgBIAEoCzIzLkFra2EuQ2x1",
+ "c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1zZy5VbmlxdWVBZGRyZXNzEj8K",
+ "AnRvGAIgASgLMjMuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8u",
+ "TXNnLlVuaXF1ZUFkZHJlc3MSGAoQc2VyaWFsaXplZEdvc3NpcBgDIAEoDCKo",
+ "AQoMR29zc2lwU3RhdHVzEkEKBGZyb20YASABKAsyMy5Ba2thLkNsdXN0ZXIu",
+ "U2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuVW5pcXVlQWRkcmVzcxIRCglhbGxI",
+ "YXNoZXMYAiADKAkSQgoHdmVyc2lvbhgDIAEoCzIxLkFra2EuQ2x1c3Rlci5T",
+ "ZXJpYWxpemF0aW9uLlByb3RvLk1zZy5WZWN0b3JDbG9jayLbAgoGR29zc2lw",
+ "EkkKDGFsbEFkZHJlc3NlcxgBIAMoCzIzLkFra2EuQ2x1c3Rlci5TZXJpYWxp",
+ "emF0aW9uLlByb3RvLk1zZy5VbmlxdWVBZGRyZXNzEhAKCGFsbFJvbGVzGAIg",
+ "AygJEhEKCWFsbEhhc2hlcxgDIAMoCRI9CgdtZW1iZXJzGAQgAygLMiwuQWtr",
+ "YS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLk1lbWJlchJGCghv",
+ "dmVydmlldxgFIAEoCzI0LkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9uLlBy",
+ "b3RvLk1zZy5Hb3NzaXBPdmVydmlldxJCCgd2ZXJzaW9uGAYgASgLMjEuQWtr",
+ "YS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlZlY3RvckNsb2Nr",
+ "EhYKDmFsbEFwcFZlcnNpb25zGAggAygJIngKDkdvc3NpcE92ZXJ2aWV3EgwK",
+ "BHNlZW4YASADKAUSWAoUb2JzZXJ2ZXJSZWFjaGFiaWxpdHkYAiADKAsyOi5B",
+ "a2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Qcm90by5Nc2cuT2JzZXJ2ZXJS",
+ "ZWFjaGFiaWxpdHkilQEKFE9ic2VydmVyUmVhY2hhYmlsaXR5EhQKDGFkZHJl",
+ "c3NJbmRleBgBIAEoBRIPCgd2ZXJzaW9uGAQgASgDElYKE3N1YmplY3RSZWFj",
+ "aGFiaWxpdHkYAiADKAsyOS5Ba2thLkNsdXN0ZXIuU2VyaWFsaXphdGlvbi5Q",
+ "cm90by5Nc2cuU3ViamVjdFJlYWNoYWJpbGl0eSLgAQoTU3ViamVjdFJlYWNo",
+ "YWJpbGl0eRIUCgxhZGRyZXNzSW5kZXgYASABKAUSXAoGc3RhdHVzGAMgASgO",
+ "MkwuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlN1Ympl",
+ "Y3RSZWFjaGFiaWxpdHkuUmVhY2hhYmlsaXR5U3RhdHVzEg8KB3ZlcnNpb24Y",
+ "BCABKAMiRAoSUmVhY2hhYmlsaXR5U3RhdHVzEg0KCVJlYWNoYWJsZRAAEg8K",
+ "C1VucmVhY2hhYmxlEAESDgoKVGVybWluYXRlZBACIpICCgZNZW1iZXISFAoM",
+ "YWRkcmVzc0luZGV4GAEgASgFEhAKCHVwTnVtYmVyGAIgASgFEkkKBnN0YXR1",
+ "cxgDIAEoDjI5LkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9uLlByb3RvLk1z",
+ "Zy5NZW1iZXIuTWVtYmVyU3RhdHVzEhgKDHJvbGVzSW5kZXhlcxgEIAMoBUIC",
+ "EAESFwoPYXBwVmVyc2lvbkluZGV4GAUgASgFImIKDE1lbWJlclN0YXR1cxIL",
+ "CgdKb2luaW5nEAASBgoCVXAQARILCgdMZWF2aW5nEAISCwoHRXhpdGluZxAD",
+ "EggKBERvd24QBBILCgdSZW1vdmVkEAUSDAoIV2Vha2x5VXAQBiKeAQoLVmVj",
+ "dG9yQ2xvY2sSEQoJdGltZXN0YW1wGAEgASgDEksKCHZlcnNpb25zGAIgAygL",
+ "MjkuQWtrYS5DbHVzdGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLlZlY3Rv",
+ "ckNsb2NrLlZlcnNpb24aLwoHVmVyc2lvbhIRCgloYXNoSW5kZXgYASABKAUS",
+ "EQoJdGltZXN0YW1wGAIgASgDIl8KDVVuaXF1ZUFkZHJlc3MSQQoHYWRkcmVz",
+ "cxgBIAEoCzIwLkFra2EuUmVtb3RlLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNn",
+ "LkFkZHJlc3NEYXRhEgsKA3VpZBgCIAEoDSKgAQoRQ2x1c3RlclJvdXRlclBv",
+ "b2wSOAoEcG9vbBgBIAEoCzIqLkFra2EuQ2x1c3Rlci5TZXJpYWxpemF0aW9u",
+ "LlByb3RvLk1zZy5Qb29sElEKCHNldHRpbmdzGAIgASgLMj8uQWtrYS5DbHVz",
+ "dGVyLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnLkNsdXN0ZXJSb3V0ZXJQb29s",
+ "U2V0dGluZ3MiPAoEUG9vbBIUCgxzZXJpYWxpemVySWQYASABKA0SEAoIbWFu",
+ "aWZlc3QYAiABKAkSDAoEZGF0YRgDIAEoDCJ8ChlDbHVzdGVyUm91dGVyUG9v",
+ "bFNldHRpbmdzEhYKDnRvdGFsSW5zdGFuY2VzGAEgASgNEhsKE21heEluc3Rh",
+ "bmNlc1Blck5vZGUYAiABKA0SGQoRYWxsb3dMb2NhbFJvdXRlZXMYAyABKAgS",
+ "DwoHdXNlUm9sZRgEIAEoCWIGcHJvdG8z"));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { global::Akka.Remote.Serialization.Proto.Msg.ContainerFormatsReflection.Descriptor, },
new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Join), global::Akka.Cluster.Serialization.Proto.Msg.Join.Parser, new[]{ "Node", "Roles", "AppVersion" }, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Welcome), global::Akka.Cluster.Serialization.Proto.Msg.Welcome.Parser, new[]{ "From", "Gossip" }, null, null, null),
+ new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Heartbeat), global::Akka.Cluster.Serialization.Proto.Msg.Heartbeat.Parser, new[]{ "From", "SequenceNr", "CreationTime" }, null, null, null),
+ new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse), global::Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse.Parser, new[]{ "From", "SequenceNr", "CreationTime" }, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.GossipEnvelope), global::Akka.Cluster.Serialization.Proto.Msg.GossipEnvelope.Parser, new[]{ "From", "To", "SerializedGossip" }, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.GossipStatus), global::Akka.Cluster.Serialization.Proto.Msg.GossipStatus.Parser, new[]{ "From", "AllHashes", "Version" }, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Akka.Cluster.Serialization.Proto.Msg.Gossip), global::Akka.Cluster.Serialization.Proto.Msg.Gossip.Parser, new[]{ "AllAddresses", "AllRoles", "AllHashes", "Members", "Overview", "Version", "AllAppVersions" }, null, null, null),
@@ -433,6 +441,378 @@ public void MergeFrom(pb::CodedInputStream input) {
}
+ ///
+ ///*
+ /// Prior to version 1.4.19
+ /// Heartbeat
+ /// Sends an Address
+ /// Version 1.4.19 can deserialize this message but does not send it
+ ///
+ internal sealed partial class Heartbeat : pb::IMessage {
+ private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new Heartbeat());
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pb::MessageParser Parser { get { return _parser; } }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pbr::MessageDescriptor Descriptor {
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[2]; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ pbr::MessageDescriptor pb::IMessage.Descriptor {
+ get { return Descriptor; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public Heartbeat() {
+ OnConstruction();
+ }
+
+ partial void OnConstruction();
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public Heartbeat(Heartbeat other) : this() {
+ From = other.from_ != null ? other.From.Clone() : null;
+ sequenceNr_ = other.sequenceNr_;
+ creationTime_ = other.creationTime_;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public Heartbeat Clone() {
+ return new Heartbeat(this);
+ }
+
+ /// Field number for the "from" field.
+ public const int FromFieldNumber = 1;
+ private global::Akka.Remote.Serialization.Proto.Msg.AddressData from_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public global::Akka.Remote.Serialization.Proto.Msg.AddressData From {
+ get { return from_; }
+ set {
+ from_ = value;
+ }
+ }
+
+ /// Field number for the "sequenceNr" field.
+ public const int SequenceNrFieldNumber = 2;
+ private long sequenceNr_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public long SequenceNr {
+ get { return sequenceNr_; }
+ set {
+ sequenceNr_ = value;
+ }
+ }
+
+ /// Field number for the "creationTime" field.
+ public const int CreationTimeFieldNumber = 3;
+ private long creationTime_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public long CreationTime {
+ get { return creationTime_; }
+ set {
+ creationTime_ = value;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override bool Equals(object other) {
+ return Equals(other as Heartbeat);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public bool Equals(Heartbeat other) {
+ if (ReferenceEquals(other, null)) {
+ return false;
+ }
+ if (ReferenceEquals(other, this)) {
+ return true;
+ }
+ if (!object.Equals(From, other.From)) return false;
+ if (SequenceNr != other.SequenceNr) return false;
+ if (CreationTime != other.CreationTime) return false;
+ return true;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override int GetHashCode() {
+ int hash = 1;
+ if (from_ != null) hash ^= From.GetHashCode();
+ if (SequenceNr != 0L) hash ^= SequenceNr.GetHashCode();
+ if (CreationTime != 0L) hash ^= CreationTime.GetHashCode();
+ return hash;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override string ToString() {
+ return pb::JsonFormatter.ToDiagnosticString(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void WriteTo(pb::CodedOutputStream output) {
+ if (from_ != null) {
+ output.WriteRawTag(10);
+ output.WriteMessage(From);
+ }
+ if (SequenceNr != 0L) {
+ output.WriteRawTag(16);
+ output.WriteInt64(SequenceNr);
+ }
+ if (CreationTime != 0L) {
+ output.WriteRawTag(24);
+ output.WriteSInt64(CreationTime);
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public int CalculateSize() {
+ int size = 0;
+ if (from_ != null) {
+ size += 1 + pb::CodedOutputStream.ComputeMessageSize(From);
+ }
+ if (SequenceNr != 0L) {
+ size += 1 + pb::CodedOutputStream.ComputeInt64Size(SequenceNr);
+ }
+ if (CreationTime != 0L) {
+ size += 1 + pb::CodedOutputStream.ComputeSInt64Size(CreationTime);
+ }
+ return size;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(Heartbeat other) {
+ if (other == null) {
+ return;
+ }
+ if (other.from_ != null) {
+ if (from_ == null) {
+ from_ = new global::Akka.Remote.Serialization.Proto.Msg.AddressData();
+ }
+ From.MergeFrom(other.From);
+ }
+ if (other.SequenceNr != 0L) {
+ SequenceNr = other.SequenceNr;
+ }
+ if (other.CreationTime != 0L) {
+ CreationTime = other.CreationTime;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(pb::CodedInputStream input) {
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ switch(tag) {
+ default:
+ input.SkipLastField();
+ break;
+ case 10: {
+ if (from_ == null) {
+ from_ = new global::Akka.Remote.Serialization.Proto.Msg.AddressData();
+ }
+ input.ReadMessage(from_);
+ break;
+ }
+ case 16: {
+ SequenceNr = input.ReadInt64();
+ break;
+ }
+ case 24: {
+ CreationTime = input.ReadSInt64();
+ break;
+ }
+ }
+ }
+ }
+
+ }
+
+ ///
+ ///*
+ /// Prior to version 1.4.19
+ /// HeartbeatRsp
+ /// Sends an UniqueAddress
+ /// Version 1.4.19 can deserialize this message but does not send it
+ ///
+ internal sealed partial class HeartBeatResponse : pb::IMessage {
+ private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new HeartBeatResponse());
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pb::MessageParser Parser { get { return _parser; } }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pbr::MessageDescriptor Descriptor {
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[3]; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ pbr::MessageDescriptor pb::IMessage.Descriptor {
+ get { return Descriptor; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public HeartBeatResponse() {
+ OnConstruction();
+ }
+
+ partial void OnConstruction();
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public HeartBeatResponse(HeartBeatResponse other) : this() {
+ From = other.from_ != null ? other.From.Clone() : null;
+ sequenceNr_ = other.sequenceNr_;
+ creationTime_ = other.creationTime_;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public HeartBeatResponse Clone() {
+ return new HeartBeatResponse(this);
+ }
+
+ /// Field number for the "from" field.
+ public const int FromFieldNumber = 1;
+ private global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress from_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress From {
+ get { return from_; }
+ set {
+ from_ = value;
+ }
+ }
+
+ /// Field number for the "sequenceNr" field.
+ public const int SequenceNrFieldNumber = 2;
+ private long sequenceNr_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public long SequenceNr {
+ get { return sequenceNr_; }
+ set {
+ sequenceNr_ = value;
+ }
+ }
+
+ /// Field number for the "creationTime" field.
+ public const int CreationTimeFieldNumber = 3;
+ private long creationTime_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public long CreationTime {
+ get { return creationTime_; }
+ set {
+ creationTime_ = value;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override bool Equals(object other) {
+ return Equals(other as HeartBeatResponse);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public bool Equals(HeartBeatResponse other) {
+ if (ReferenceEquals(other, null)) {
+ return false;
+ }
+ if (ReferenceEquals(other, this)) {
+ return true;
+ }
+ if (!object.Equals(From, other.From)) return false;
+ if (SequenceNr != other.SequenceNr) return false;
+ if (CreationTime != other.CreationTime) return false;
+ return true;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override int GetHashCode() {
+ int hash = 1;
+ if (from_ != null) hash ^= From.GetHashCode();
+ if (SequenceNr != 0L) hash ^= SequenceNr.GetHashCode();
+ if (CreationTime != 0L) hash ^= CreationTime.GetHashCode();
+ return hash;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override string ToString() {
+ return pb::JsonFormatter.ToDiagnosticString(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void WriteTo(pb::CodedOutputStream output) {
+ if (from_ != null) {
+ output.WriteRawTag(10);
+ output.WriteMessage(From);
+ }
+ if (SequenceNr != 0L) {
+ output.WriteRawTag(16);
+ output.WriteInt64(SequenceNr);
+ }
+ if (CreationTime != 0L) {
+ output.WriteRawTag(24);
+ output.WriteInt64(CreationTime);
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public int CalculateSize() {
+ int size = 0;
+ if (from_ != null) {
+ size += 1 + pb::CodedOutputStream.ComputeMessageSize(From);
+ }
+ if (SequenceNr != 0L) {
+ size += 1 + pb::CodedOutputStream.ComputeInt64Size(SequenceNr);
+ }
+ if (CreationTime != 0L) {
+ size += 1 + pb::CodedOutputStream.ComputeInt64Size(CreationTime);
+ }
+ return size;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(HeartBeatResponse other) {
+ if (other == null) {
+ return;
+ }
+ if (other.from_ != null) {
+ if (from_ == null) {
+ from_ = new global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress();
+ }
+ From.MergeFrom(other.From);
+ }
+ if (other.SequenceNr != 0L) {
+ SequenceNr = other.SequenceNr;
+ }
+ if (other.CreationTime != 0L) {
+ CreationTime = other.CreationTime;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(pb::CodedInputStream input) {
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ switch(tag) {
+ default:
+ input.SkipLastField();
+ break;
+ case 10: {
+ if (from_ == null) {
+ from_ = new global::Akka.Cluster.Serialization.Proto.Msg.UniqueAddress();
+ }
+ input.ReadMessage(from_);
+ break;
+ }
+ case 16: {
+ SequenceNr = input.ReadInt64();
+ break;
+ }
+ case 24: {
+ CreationTime = input.ReadInt64();
+ break;
+ }
+ }
+ }
+ }
+
+ }
+
///
/// Gossip Envelope
///
@@ -443,7 +823,7 @@ internal sealed partial class GossipEnvelope : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[2]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[4]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -631,7 +1011,7 @@ internal sealed partial class GossipStatus : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[3]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[5]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -811,7 +1191,7 @@ internal sealed partial class Gossip : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[4]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[6]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -1071,7 +1451,7 @@ internal sealed partial class GossipOverview : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[5]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[7]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -1207,7 +1587,7 @@ internal sealed partial class ObserverReachability : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[8]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[10]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -1803,7 +2183,7 @@ internal sealed partial class VectorClock : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[9]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[11]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -2098,7 +2478,7 @@ internal sealed partial class UniqueAddress : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[10]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[12]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -2249,7 +2629,7 @@ internal sealed partial class ClusterRouterPool : pb::IMessage {
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[12]; }
+ get { return global::Akka.Cluster.Serialization.Proto.Msg.ClusterMessagesReflection.Descriptor.MessageTypes[14]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -2579,7 +2959,7 @@ internal sealed partial class ClusterRouterPoolSettings : pb::IMessage