Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix serialize-messages for Akka.Cluster and Akka.Remote #3725

53 changes: 53 additions & 0 deletions src/core/Akka.Cluster.Tests/Serialization/BugFix3724Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//-----------------------------------------------------------------------
// <copyright file="BugFix3724Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2019 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2019 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests.Serialization
{
/// <summary>
/// https://github.com/akkadotnet/akka.net/issues/3724
/// Used to validate that `akka.actor.serialize-messages = on` works while
/// using Akka.Cluster
/// </summary>
public class BugFix3724Spec : AkkaSpec
{
public BugFix3724Spec(ITestOutputHelper helper)
: base(@"akka.actor.provider = cluster
akka.actor.serialize-messages = on", helper)
{
_cluster = Cluster.Get(Sys);
_selfAddress = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress;
}

private readonly Address _selfAddress;
private readonly Cluster _cluster;

[Fact(DisplayName = "Should be able to use 'akka.actor.serialize-messages' while running Akka.Cluster")]
public void Should_serialize_all_AkkaCluster_messages()
{
_cluster.Subscribe(TestActor, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents,
typeof(ClusterEvent.MemberUp));
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter.Exception<Exception>().Expect(0, () =>
{
// wait for a singleton cluster to fully form and publish a member up event
_cluster.Join(_selfAddress);
var up = ExpectMsg<ClusterEvent.MemberUp>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case here is simple: test that end-to-end, the Akka.Cluster self-join process works and all manner of internal event publication goes off without a hitch. Caught two different major serialization errors with this and fixed both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the MemberUp event makes it all the way to this actor, we can safely assume that the messages necessary to run Akka.Cluster's internals made it intact.

up.Member.Address.Should().Be(_selfAddress);
});
});
}
}
}
91 changes: 48 additions & 43 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ internal sealed class InternalClusterAction
/// </summary>
internal sealed class Join : IClusterMessage
{
readonly UniqueAddress _node;
readonly ImmutableHashSet<string> _roles;
private readonly UniqueAddress _node;
private readonly ImmutableHashSet<string> _roles;

/// <summary>
/// TBD
Expand Down Expand Up @@ -192,8 +192,8 @@ public override string ToString()
/// </summary>
internal sealed class Welcome : IClusterMessage
{
readonly UniqueAddress _from;
readonly Gossip _gossip;
private readonly UniqueAddress _from;
private readonly Gossip _gossip;

/// <summary>
/// TBD
Expand Down Expand Up @@ -246,7 +246,7 @@ public override int GetHashCode()
/// </summary>
internal sealed class JoinSeedNodes : IDeadLetterSuppression
{
readonly ImmutableList<Address> _seedNodes;
private readonly ImmutableList<Address> _seedNodes;

/// <summary>
/// Creates a new instance of the command.
Expand Down Expand Up @@ -291,7 +291,7 @@ public override bool Equals(object obj)
/// <inheritdoc cref="JoinSeenNode"/>
internal sealed class InitJoinAck : IClusterMessage, IDeadLetterSuppression
{
readonly Address _address;
private readonly Address _address;

/// <summary>
/// TBD
Expand Down Expand Up @@ -334,7 +334,7 @@ public override int GetHashCode()
/// <inheritdoc cref="JoinSeenNode"/>
internal sealed class InitJoinNack : IClusterMessage, IDeadLetterSuppression
{
readonly Address _address;
private readonly Address _address;

/// <summary>
/// TBD
Expand Down Expand Up @@ -545,7 +545,7 @@ public static PublishStatsTick Instance
/// </summary>
internal sealed class SendGossipTo
{
readonly Address _address;
private readonly Address _address;

/// <summary>
/// TBD
Expand Down Expand Up @@ -649,9 +649,9 @@ public interface ISubscriptionMessage { }
/// </summary>
public sealed class Subscribe : ISubscriptionMessage
{
readonly IActorRef _subscriber;
readonly ClusterEvent.SubscriptionInitialStateMode _initialStateMode;
readonly ImmutableHashSet<Type> _to;
private readonly IActorRef _subscriber;
private readonly ClusterEvent.SubscriptionInitialStateMode _initialStateMode;
private readonly ImmutableHashSet<Type> _to;

/// <summary>
/// Creates a new subscription
Expand Down Expand Up @@ -697,8 +697,8 @@ public ImmutableHashSet<Type> To
/// </summary>
public sealed class Unsubscribe : ISubscriptionMessage, IDeadLetterSuppression
{
readonly IActorRef _subscriber;
readonly Type _to;
private readonly IActorRef _subscriber;
private readonly Type _to;

/// <summary>
/// TBD
Expand Down Expand Up @@ -733,7 +733,7 @@ public Type To
/// </summary>
public sealed class SendCurrentClusterState : ISubscriptionMessage
{
readonly IActorRef _receiver;
private readonly IActorRef _receiver;

/// <summary>
/// TBD
Expand All @@ -754,41 +754,46 @@ public SendCurrentClusterState(IActorRef receiver)
}

/// <summary>
/// TBD
/// INTERNAL API.
///
/// Marker interface for publication events from Akka.Cluster.
/// </summary>
interface IPublishMessage { }
/// <remarks>
/// <see cref="INoSerializationVerificationNeeded"/> is not explicitly used on the JVM,
/// but without it we run into serialization issues via https://github.com/akkadotnet/akka.net/issues/3724
/// </remarks>
private interface IPublishMessage : INoSerializationVerificationNeeded { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First fix: made it so IPublishMessage doesn't get checked by akka.actor.serialize-messages - did this because designating a single Gossip class constructor with JsonConstructor wasn't going to work due to the fact that we make use of all three different constructors in different situations. The JVM gets away with this by using the built-in Serializable trait in Scala, which is roughly equivalent to our binary formatter, which .NET no longer has in .NET Core. So, I figured this would be the best substitute here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// <summary>
/// TBD
/// INTERNAL API.
///
/// Used to publish Gossip and Membership changes inside Akka.Cluster.
/// </summary>
internal sealed class PublishChanges : IPublishMessage
{
readonly Gossip _newGossip;

/// <summary>
/// TBD
/// Creates a new <see cref="PublishChanges"/> message with updated gossip.
/// </summary>
/// <param name="newGossip">TBD</param>
/// <param name="newGossip">The gossip to publish internally.</param>
internal PublishChanges(Gossip newGossip)
{
_newGossip = newGossip;
NewGossip = newGossip;
}

/// <summary>
/// TBD
/// The gossip being published.
/// </summary>
public Gossip NewGossip
{
get { return _newGossip; }
}
public Gossip NewGossip { get; }
}

/// <summary>
/// TBD
/// INTERNAL API.
///
/// Used to publish events out to the cluster.
/// </summary>
internal sealed class PublishEvent : IPublishMessage
{
readonly ClusterEvent.IClusterDomainEvent _event;
private readonly ClusterEvent.IClusterDomainEvent _event;

/// <summary>
/// TBD
Expand Down Expand Up @@ -991,15 +996,15 @@ internal static string VclockName(UniqueAddress node)

// note that self is not initially member,
// and the SendGossip is not versioned for this 'Node' yet
Gossip _latestGossip = Gossip.Empty;
private Gossip _latestGossip = Gossip.Empty;

readonly bool _statsEnabled;
private readonly bool _statsEnabled;
private GossipStats _gossipStats = new GossipStats();
private ImmutableList<Address> _seedNodes;
private IActorRef _seedNodeProcess;
private int _seedNodeProcessCounter = 0; //for unique names

readonly IActorRef _publisher;
private readonly IActorRef _publisher;
private int _leaderActionCounter = 0;
private int _selfDownCounter = 0;

Expand Down Expand Up @@ -1093,15 +1098,15 @@ private void AddCoordinatedLeave()
});
}

ActorSelection ClusterCore(Address address)
private ActorSelection ClusterCore(Address address)
{
return Context.ActorSelection(new RootActorPath(address) / "system" / "cluster" / "core" / "daemon");
}

readonly ICancelable _gossipTaskCancellable;
readonly ICancelable _failureDetectorReaperTaskCancellable;
readonly ICancelable _leaderActionsTaskCancellable;
readonly ICancelable _publishStatsTaskTaskCancellable;
private readonly ICancelable _gossipTaskCancellable;
private readonly ICancelable _failureDetectorReaperTaskCancellable;
private readonly ICancelable _leaderActionsTaskCancellable;
private readonly ICancelable _publishStatsTaskTaskCancellable;

/// <inheritdoc cref="ActorBase.PreStart"/>
protected override void PreStart()
Expand Down Expand Up @@ -2583,7 +2588,7 @@ public void PublishInternalStats()
_publisher.Tell(new ClusterEvent.CurrentInternalStats(_gossipStats, vclockStats));
}

readonly ILoggingAdapter _log = Context.GetLogger();
private readonly ILoggingAdapter _log = Context.GetLogger();
}

/// <summary>
Expand Down Expand Up @@ -2706,13 +2711,13 @@ private void Done(object message)
/// </summary>
internal sealed class FirstSeedNodeProcess : UntypedActor
{
readonly ILoggingAdapter _log = Context.GetLogger();
private readonly ILoggingAdapter _log = Context.GetLogger();

private ImmutableList<Address> _remainingSeeds;
readonly Address _selfAddress;
readonly Cluster _cluster;
readonly Deadline _timeout;
readonly ICancelable _retryTaskToken;
private readonly Address _selfAddress;
private readonly Cluster _cluster;
private readonly Deadline _timeout;
private readonly ICancelable _retryTaskToken;

/// <summary>
/// TBD
Expand Down
15 changes: 15 additions & 0 deletions src/core/Akka.Cluster/Member.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using Akka.Actor;
using Akka.Util.Internal;
using Newtonsoft.Json;

namespace Akka.Cluster
{
Expand Down Expand Up @@ -92,6 +93,20 @@ internal Member(UniqueAddress uniqueAddress, int upNumber, MemberStatus status,
Roles = roles;
}

/// <summary>
/// Used when `akka.actor.serialize-messages = on`.
/// </summary>
/// <param name="uniqueAddress">The address of the member.</param>
/// <param name="upNumber">The upNumber of the member, as assigned by the leader at the time the node joined the cluster.</param>
/// <param name="status">The status of this member.</param>
/// <param name="roles">The roles for this member. Can be empty.</param>
[JsonConstructor]
internal Member(UniqueAddress uniqueAddress, int upNumber, MemberStatus status, IEnumerable<string> roles)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with serializing the Member class is an old, but familiar one: JSON.NET is unable to serialize the System.Collections.Immutable.ImmutableHashSet<string> taken for the roles parameter on the normal constructor this actor uses. Thus, I created a second internal constructor that just takes an IEnumerable<string> instead and marked that as the JsonConstructor - ideally once Hyperion is done we will no longer need this.

: this(uniqueAddress, upNumber, status, roles.ToImmutableHashSet())
{

}

/// <summary>
/// The <see cref="Address"/> for this member.
/// </summary>
Expand Down