From ad562b49aba5dd2de29ccdfdfc82284fcc732709 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 26 Jun 2019 18:20:08 -0500 Subject: [PATCH] close #3830 - added serialization support for ClusterShardingStats --- .../ClusterShardingMessageSerializerSpec.cs | 2 +- .../ClusterShardingMessageSerializer.cs | 82 +++++++++++++++++-- .../Akka.Cluster.Sharding/ShardingMessages.cs | 8 +- 3 files changed, 80 insertions(+), 12 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingMessageSerializerSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingMessageSerializerSpec.cs index edab0e1cda4..65dd5999224 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingMessageSerializerSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingMessageSerializerSpec.cs @@ -144,7 +144,7 @@ public void ClusterShardingMessageSerializer_must_be_able_to_serialize_StartEnti public void ClusterShardingMessageSerializer_must_serialize_ClusterShardingStats() { CheckSerialization(new GetClusterShardingStats(TimeSpan.FromMilliseconds(500))); - CheckSerialization(new ClusterShardingStats(ImmutableDictionary.Empty.Add(Address.AllSystems, + CheckSerialization(new ClusterShardingStats(ImmutableDictionary.Empty.Add(new Address("akka.tcp", "foo", "localhost", 9110), new ShardRegionStats(ImmutableDictionary.Empty.Add("f", 12))))); } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Serialization/ClusterShardingMessageSerializer.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Serialization/ClusterShardingMessageSerializer.cs index c3628315ddd..39f3f017fad 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Serialization/ClusterShardingMessageSerializer.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Serialization/ClusterShardingMessageSerializer.cs @@ -13,8 +13,11 @@ using System.Linq; using System.Runtime.Serialization; using Akka.Actor; +using Akka.Cluster.Sharding.Serialization.Proto.Msg; +using Akka.Remote.Serialization.Proto.Msg; using Akka.Serialization; using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; using ActorRefMessage = Akka.Remote.Serialization.Proto.Msg.ActorRefData; namespace Akka.Cluster.Sharding.Serialization @@ -59,6 +62,9 @@ public class ClusterShardingMessageSerializer : SerializerWithStringManifest private const string GetShardRegionStatsManifest = "DC"; private const string ShardRegionStatsManifest = "DD"; + private const string GetClusterShardingStatsManifest = "GS"; + private const string ClusterShardingStatsManifest = "CS"; + #endregion private readonly Dictionary> _fromBinaryMap; @@ -71,9 +77,9 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste { _fromBinaryMap = new Dictionary> { - {EntityStateManifest, EntityStateFromBinary}, - {EntityStartedManifest, EntityStartedFromBinary}, - {EntityStoppedManifest, EntityStoppedFromBinary}, + {EntityStateManifest, bytes => EntityStateFromBinary(bytes) }, + {EntityStartedManifest, bytes => EntityStartedFromBinary(bytes) }, + {EntityStoppedManifest, bytes => EntityStoppedFromBinary(bytes) }, {CoordinatorStateManifest, CoordinatorStateFromBinary}, {ShardRegionRegisteredManifest, bytes => new PersistentShardCoordinator.ShardRegionRegistered(ActorRefMessageFromBinary(bytes)) }, @@ -87,7 +93,7 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste {RegisterProxyManifest, bytes => new PersistentShardCoordinator.RegisterProxy(ActorRefMessageFromBinary(bytes)) }, {RegisterAckManifest, bytes => new PersistentShardCoordinator.RegisterAck(ActorRefMessageFromBinary(bytes)) }, {GetShardHomeManifest, bytes => new PersistentShardCoordinator.GetShardHome(ShardIdMessageFromBinary(bytes)) }, - {ShardHomeManifest, ShardHomeFromBinary}, + {ShardHomeManifest, bytes => ShardHomeFromBinary(bytes) }, {HostShardManifest, bytes => new PersistentShardCoordinator.HostShard(ShardIdMessageFromBinary(bytes)) }, {ShardStartedManifest, bytes => new PersistentShardCoordinator.ShardStarted(ShardIdMessageFromBinary(bytes)) }, {BeginHandOffManifest, bytes => new PersistentShardCoordinator.BeginHandOff(ShardIdMessageFromBinary(bytes)) }, @@ -97,12 +103,14 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste {GracefulShutdownReqManifest, bytes => new PersistentShardCoordinator.GracefulShutdownRequest(ActorRefMessageFromBinary(bytes)) }, {GetShardStatsManifest, bytes => Shard.GetShardStats.Instance }, - {ShardStatsManifest, ShardStatsFromBinary}, + {ShardStatsManifest, bytes => ShardStatsFromBinary(bytes) }, { GetShardRegionStatsManifest, bytes => GetShardRegionStats.Instance }, { ShardRegionStatsManifest, bytes => ShardRegionStatsFromBinary(bytes) }, + { GetClusterShardingStatsManifest, bytes => GetClusterShardingStatsFromBinary(bytes) }, + { ClusterShardingStatsManifest, bytes => ClusterShardingStatsFromBinary(bytes) }, - {StartEntityManifest, StartEntityFromBinary }, - {StartEntityAckManifest, StartEntityAckFromBinary} + {StartEntityManifest, bytes => StartEntityFromBinary(bytes) }, + {StartEntityAckManifest, bytes => StartEntityAckFromBinary(bytes) } }; } @@ -146,6 +154,8 @@ public override byte[] ToBinary(object obj) case Shard.ShardStats o: return ShardStatsToProto(o).ToByteArray(); case GetShardRegionStats o: return new byte[0]; case ShardRegionStats o: return ShardRegionStatsToProto(o).ToByteArray(); + case GetClusterShardingStats o: return GetClusterShardingStatsToProto(o).ToByteArray(); + case ClusterShardingStats o: return ClusterShardingStatsToProto(o).ToByteArray(); } throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{this.GetType()}]"); } @@ -210,6 +220,8 @@ public override string Manifest(object o) case Shard.ShardStats _: return ShardStatsManifest; case GetShardRegionStats _: return GetShardRegionStatsManifest; case ShardRegionStats _: return ShardRegionStatsManifest; + case GetClusterShardingStats _: return GetClusterShardingStatsManifest; + case ClusterShardingStats _: return ClusterShardingStatsManifest; } throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{this.GetType()}]"); } @@ -422,6 +434,62 @@ private static ShardRegionStats ShardRegionStatsFromBinary(byte[] b) var p = Proto.Msg.ShardRegionStats.Parser.ParseFrom(b); return new ShardRegionStats(p.Stats.ToImmutableDictionary()); } + + // GetClusterShardingStats + private static Proto.Msg.GetClusterShardingStats GetClusterShardingStatsToProto(GetClusterShardingStats stats) + { + var p = new Proto.Msg.GetClusterShardingStats(); + p.Timeout = Duration.FromTimeSpan(stats.Timeout); + return p; + } + + private static GetClusterShardingStats GetClusterShardingStatsFromBinary(byte[] b) + { + var p = Proto.Msg.GetClusterShardingStats.Parser.ParseFrom(b); + return new GetClusterShardingStats(p.Timeout.ToTimeSpan()); + } + + // ClusterShardingStats + private static Proto.Msg.ClusterShardingStats ClusterShardingStatsToProto(ClusterShardingStats stats) + { + var p = new Proto.Msg.ClusterShardingStats(); + foreach (var s in stats.Regions) + { + p.Regions.Add(new ShardRegionWithAddress() { NodeAddress = AddressToProto(s.Key), Stats = ShardRegionStatsToProto(s.Value)}); + } + + return p; + } + + private static ClusterShardingStats ClusterShardingStatsFromBinary(byte[] b) + { + var p = Proto.Msg.ClusterShardingStats.Parser.ParseFrom(b); + var dict = new Dictionary(); + foreach (var s in p.Regions) + { + dict[AddressFrom(s.NodeAddress)] = new ShardRegionStats(s.Stats.Stats.ToImmutableDictionary()); + } + return new ClusterShardingStats(dict.ToImmutableDictionary()); + } + + private static AddressData AddressToProto(Address address) + { + var message = new AddressData(); + message.System = address.System; + message.Hostname = address.Host; + message.Port = (uint)(address.Port ?? 0); + message.Protocol = address.Protocol; + return message; + } + + private static Address AddressFrom(AddressData addressProto) + { + return new Address( + addressProto.Protocol, + addressProto.System, + addressProto.Hostname, + addressProto.Port == 0 ? null : (int?)addressProto.Port); + } private static string ShardIdMessageFromBinary(byte[] bytes) { diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs index f13c057bd04..f4dde35c52d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs @@ -142,7 +142,7 @@ public CurrentRegions(IImmutableSet
regions) /// the state of the shard regions. /// [Serializable] - public sealed class GetClusterShardingStats : IShardRegionQuery + public sealed class GetClusterShardingStats : IShardRegionQuery, IClusterShardingSerializable { /// /// The timeout for this operation. @@ -178,7 +178,7 @@ public override int GetHashCode() /// Reply to , contains statistics about all the sharding regions in the cluster. /// [Serializable] - public sealed class ClusterShardingStats + public sealed class ClusterShardingStats : IClusterShardingSerializable { /// /// All of the statistics for a specific shard region organized per-node. @@ -221,7 +221,7 @@ public override int GetHashCode() /// For the statistics for the entire cluster, see . /// [Serializable] - public sealed class GetShardRegionStats : IShardRegionQuery + public sealed class GetShardRegionStats : IShardRegionQuery, IClusterShardingSerializable { /// /// TBD @@ -277,7 +277,7 @@ public CurrentShardRegionState(IImmutableSet shards) /// Entity allocation statistics for a specific shard region. /// [Serializable] - public sealed class ShardRegionStats + public sealed class ShardRegionStats : IClusterShardingSerializable { /// /// The set of shardId / entity count pairs