Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

porting Cluster heartbeat timings, hardened Akka.Cluster serialization #4934

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,12 @@ namespace Akka.Cluster.SBR
}
namespace Akka.Cluster.Serialization
{
public class ClusterMessageSerializer : Akka.Serialization.Serializer
[Akka.Annotations.InternalApiAttribute()]
public class ClusterMessageSerializer : Akka.Serialization.SerializerWithStringManifest
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So even though this is part of the public API, I don't think this is much of a "breaking change" as the serializer isn't called directly from user code (that I'm aware of) and it's really meant to be an internal api.

{
public ClusterMessageSerializer(Akka.Actor.ExtendedActorSystem system) { }
public override bool IncludeManifest { get; }
public override object FromBinary(byte[] bytes, System.Type type) { }
public override object FromBinary(byte[] bytes, string manifest) { }
public override string Manifest(object o) { }
public override byte[] ToBinary(object obj) { }
}
}
35 changes: 35 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterHeartbeatReceiverSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using static Akka.Cluster.ClusterHeartbeatSender;

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatReceiverSpec : AkkaSpec
{
public static Config Config = @"akka.actor.provider = cluster";

public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
: base(Config, output)
{

}

[Fact]
public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime()
{
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys)));
heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2));
ExpectMsg<HeartbeatRsp>(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2));
}
}
}
66 changes: 66 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterHeartbeatSenderSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System.Collections.Immutable;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Akka.Util;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Cluster.ClusterHeartbeatSender;

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatSenderSpec : AkkaSpec
{
class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
private readonly TestProbe _probe;

public TestClusterHeartbeatSender(TestProbe probe)
{
_probe = probe;
}

protected override void PreStart()
{
// don't register for cluster events
}

protected override ActorSelection HeartbeatReceiver(Address address)
{
return Context.ActorSelection(_probe.Ref.Path);
}
}

public static readonly Config Config = @"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.cluster.failure-detector.heartbeat-interval = 0.2s
";

public ClusterHeartbeatSenderSpec(ITestOutputHelper output)
: base(Config, output){ }

[Fact]
public void ClusterHeartBeatSender_must_increment_heartbeat_SeqNo()
{
var probe = CreateTestProbe();
var underTest = Sys.ActorOf(Props.Create(() => new TestClusterHeartbeatSender(probe)));

underTest.Tell(new ClusterEvent.CurrentClusterState());
underTest.Tell(new ClusterEvent.MemberUp(new Member(
new UniqueAddress(new Address("akka", Sys.Name), 1), 1,
MemberStatus.Up, ImmutableHashSet<string>.Empty, AppVersion.Zero)));

probe.ExpectMsg<Heartbeat>().SequenceNr.Should().Be(1L);
probe.ExpectMsg<Heartbeat>().SequenceNr.Should().Be(2L);
}
}
}
10 changes: 7 additions & 3 deletions src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -56,9 +57,12 @@ protected void AwaitUp()
/// </summary>
protected void Join(string expected)
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Join(_selfAddress));
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Join(_selfAddress));
});
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Cluster.Serialization;
using Akka.Routing;
using Akka.Serialization;
using Akka.TestKit;
using Xunit;
using FluentAssertions;
using Akka.Util;
using Google.Protobuf;

namespace Akka.Cluster.Tests.Serialization
{
Expand All @@ -33,19 +36,47 @@ public ClusterMessageSerializerSpec()
public void Can_serialize_Heartbeat()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var message = new ClusterHeartbeatSender.Heartbeat(address);
var message = new ClusterHeartbeatSender.Heartbeat(address, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_Hearbeatv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest);
}

[Fact]
public void Can_serialize_HeartbeatRsp()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var uniqueAddress = new UniqueAddress(address, 17);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_HearbeatRspv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest);
}

[Fact]
public void Can_serialize_GossipEnvelope()
{
Expand Down Expand Up @@ -191,6 +222,7 @@ private T AssertAndReturn<T>(T message)
{
var serializer = Sys.Serialization.FindSerializerFor(message);
var serialized = serializer.ToBinary(message);
serializer.Should().BeOfType<ClusterMessageSerializer>();
return serializer.FromBinary<T>(serialized);
}

Expand Down
26 changes: 22 additions & 4 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,7 @@ internal void Shutdown()
LogInfo("Shutting down...");
System.Stop(_clusterDaemons);

if (_readView != null)
{
_readView.Dispose();
}
_readView?.Dispose();

LogInfo("Successfully shut down");
}
Expand Down Expand Up @@ -583,6 +580,27 @@ public InfoLogger(ILoggingAdapter log, ClusterSettings settings, Address selfAdd
_selfAddress = selfAddress;
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.DebugLevel"/> log entry with the specific message.
/// </summary>
/// <param name="message">The message being logged.</param>
internal void LogDebug(string message)
{
if (_log.IsDebugEnabled)
_log.Debug("Cluster Node [{0}] - {1}", _selfAddress, message);
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.DebugLevel"/> log entry with the specific template and arguments.
/// </summary>
/// <param name="template">The template being rendered and logged.</param>
/// <param name="arg1">The argument that fills in the template placeholder.</param>
internal void LogDebug(string template, object arg1)
{
if (_log.IsDebugEnabled)
_log.Debug("Cluster Node [{1}] - " + template, arg1, _selfAddress);
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.InfoLevel"/> log entry with the specific message.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ private void CreateChildren()
{
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>(), "core");

Context.ActorOf(Props.Create<ClusterHeartbeatReceiver>(), "heartbeatReceiver");
Context.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Context.System)), "heartbeatReceiver");
}

protected override void PostStop()
Expand Down
Loading