Skip to content

Commit

Permalink
close akkadotnet#3830 - added serialization support for ClusterShardi…
Browse files Browse the repository at this point in the history
…ngStats
  • Loading branch information
Aaronontheweb committed Jun 26, 2019
1 parent b03abc5 commit ad562b4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void ClusterShardingMessageSerializer_must_be_able_to_serialize_StartEnti
public void ClusterShardingMessageSerializer_must_serialize_ClusterShardingStats()
{
CheckSerialization(new GetClusterShardingStats(TimeSpan.FromMilliseconds(500)));
CheckSerialization(new ClusterShardingStats(ImmutableDictionary<Address, ShardRegionStats>.Empty.Add(Address.AllSystems,
CheckSerialization(new ClusterShardingStats(ImmutableDictionary<Address, ShardRegionStats>.Empty.Add(new Address("akka.tcp", "foo", "localhost", 9110),
new ShardRegionStats(ImmutableDictionary<string, int>.Empty.Add("f", 12)))));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
using System.Linq;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Sharding.Serialization.Proto.Msg;
using Akka.Remote.Serialization.Proto.Msg;
using Akka.Serialization;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using ActorRefMessage = Akka.Remote.Serialization.Proto.Msg.ActorRefData;

namespace Akka.Cluster.Sharding.Serialization
Expand Down Expand Up @@ -59,6 +62,9 @@ public class ClusterShardingMessageSerializer : SerializerWithStringManifest
private const string GetShardRegionStatsManifest = "DC";
private const string ShardRegionStatsManifest = "DD";

private const string GetClusterShardingStatsManifest = "GS";
private const string ClusterShardingStatsManifest = "CS";

#endregion

private readonly Dictionary<string, Func<byte[], object>> _fromBinaryMap;
Expand All @@ -71,9 +77,9 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{
_fromBinaryMap = new Dictionary<string, Func<byte[], object>>
{
{EntityStateManifest, EntityStateFromBinary},
{EntityStartedManifest, EntityStartedFromBinary},
{EntityStoppedManifest, EntityStoppedFromBinary},
{EntityStateManifest, bytes => EntityStateFromBinary(bytes) },
{EntityStartedManifest, bytes => EntityStartedFromBinary(bytes) },
{EntityStoppedManifest, bytes => EntityStoppedFromBinary(bytes) },

{CoordinatorStateManifest, CoordinatorStateFromBinary},
{ShardRegionRegisteredManifest, bytes => new PersistentShardCoordinator.ShardRegionRegistered(ActorRefMessageFromBinary(bytes)) },
Expand All @@ -87,7 +93,7 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{RegisterProxyManifest, bytes => new PersistentShardCoordinator.RegisterProxy(ActorRefMessageFromBinary(bytes)) },
{RegisterAckManifest, bytes => new PersistentShardCoordinator.RegisterAck(ActorRefMessageFromBinary(bytes)) },
{GetShardHomeManifest, bytes => new PersistentShardCoordinator.GetShardHome(ShardIdMessageFromBinary(bytes)) },
{ShardHomeManifest, ShardHomeFromBinary},
{ShardHomeManifest, bytes => ShardHomeFromBinary(bytes) },
{HostShardManifest, bytes => new PersistentShardCoordinator.HostShard(ShardIdMessageFromBinary(bytes)) },
{ShardStartedManifest, bytes => new PersistentShardCoordinator.ShardStarted(ShardIdMessageFromBinary(bytes)) },
{BeginHandOffManifest, bytes => new PersistentShardCoordinator.BeginHandOff(ShardIdMessageFromBinary(bytes)) },
Expand All @@ -97,12 +103,14 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{GracefulShutdownReqManifest, bytes => new PersistentShardCoordinator.GracefulShutdownRequest(ActorRefMessageFromBinary(bytes)) },

{GetShardStatsManifest, bytes => Shard.GetShardStats.Instance },
{ShardStatsManifest, ShardStatsFromBinary},
{ShardStatsManifest, bytes => ShardStatsFromBinary(bytes) },
{ GetShardRegionStatsManifest, bytes => GetShardRegionStats.Instance },
{ ShardRegionStatsManifest, bytes => ShardRegionStatsFromBinary(bytes) },
{ GetClusterShardingStatsManifest, bytes => GetClusterShardingStatsFromBinary(bytes) },
{ ClusterShardingStatsManifest, bytes => ClusterShardingStatsFromBinary(bytes) },

{StartEntityManifest, StartEntityFromBinary },
{StartEntityAckManifest, StartEntityAckFromBinary}
{StartEntityManifest, bytes => StartEntityFromBinary(bytes) },
{StartEntityAckManifest, bytes => StartEntityAckFromBinary(bytes) }
};
}

Expand Down Expand Up @@ -146,6 +154,8 @@ public override byte[] ToBinary(object obj)
case Shard.ShardStats o: return ShardStatsToProto(o).ToByteArray();
case GetShardRegionStats o: return new byte[0];
case ShardRegionStats o: return ShardRegionStatsToProto(o).ToByteArray();
case GetClusterShardingStats o: return GetClusterShardingStatsToProto(o).ToByteArray();
case ClusterShardingStats o: return ClusterShardingStatsToProto(o).ToByteArray();
}
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{this.GetType()}]");
}
Expand Down Expand Up @@ -210,6 +220,8 @@ public override string Manifest(object o)
case Shard.ShardStats _: return ShardStatsManifest;
case GetShardRegionStats _: return GetShardRegionStatsManifest;
case ShardRegionStats _: return ShardRegionStatsManifest;
case GetClusterShardingStats _: return GetClusterShardingStatsManifest;
case ClusterShardingStats _: return ClusterShardingStatsManifest;
}
throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{this.GetType()}]");
}
Expand Down Expand Up @@ -422,6 +434,62 @@ private static ShardRegionStats ShardRegionStatsFromBinary(byte[] b)
var p = Proto.Msg.ShardRegionStats.Parser.ParseFrom(b);
return new ShardRegionStats(p.Stats.ToImmutableDictionary());
}

// GetClusterShardingStats
private static Proto.Msg.GetClusterShardingStats GetClusterShardingStatsToProto(GetClusterShardingStats stats)
{
var p = new Proto.Msg.GetClusterShardingStats();
p.Timeout = Duration.FromTimeSpan(stats.Timeout);
return p;
}

private static GetClusterShardingStats GetClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.GetClusterShardingStats.Parser.ParseFrom(b);
return new GetClusterShardingStats(p.Timeout.ToTimeSpan());
}

// ClusterShardingStats
private static Proto.Msg.ClusterShardingStats ClusterShardingStatsToProto(ClusterShardingStats stats)
{
var p = new Proto.Msg.ClusterShardingStats();
foreach (var s in stats.Regions)
{
p.Regions.Add(new ShardRegionWithAddress() { NodeAddress = AddressToProto(s.Key), Stats = ShardRegionStatsToProto(s.Value)});
}

return p;
}

private static ClusterShardingStats ClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.ClusterShardingStats.Parser.ParseFrom(b);
var dict = new Dictionary<Address, ShardRegionStats>();
foreach (var s in p.Regions)
{
dict[AddressFrom(s.NodeAddress)] = new ShardRegionStats(s.Stats.Stats.ToImmutableDictionary());
}
return new ClusterShardingStats(dict.ToImmutableDictionary());
}

private static AddressData AddressToProto(Address address)
{
var message = new AddressData();
message.System = address.System;
message.Hostname = address.Host;
message.Port = (uint)(address.Port ?? 0);
message.Protocol = address.Protocol;
return message;
}

private static Address AddressFrom(AddressData addressProto)
{
return new Address(
addressProto.Protocol,
addressProto.System,
addressProto.Hostname,
addressProto.Port == 0 ? null : (int?)addressProto.Port);
}

private static string ShardIdMessageFromBinary(byte[] bytes)
{
Expand Down
8 changes: 4 additions & 4 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public CurrentRegions(IImmutableSet<Address> regions)
/// the state of the shard regions.
/// </summary>
[Serializable]
public sealed class GetClusterShardingStats : IShardRegionQuery
public sealed class GetClusterShardingStats : IShardRegionQuery, IClusterShardingSerializable
{
/// <summary>
/// The timeout for this operation.
Expand Down Expand Up @@ -178,7 +178,7 @@ public override int GetHashCode()
/// Reply to <see cref="GetClusterShardingStats"/>, contains statistics about all the sharding regions in the cluster.
/// </summary>
[Serializable]
public sealed class ClusterShardingStats
public sealed class ClusterShardingStats : IClusterShardingSerializable
{
/// <summary>
/// All of the statistics for a specific shard region organized per-node.
Expand Down Expand Up @@ -221,7 +221,7 @@ public override int GetHashCode()
/// For the statistics for the entire cluster, see <see cref="GetClusterShardingStats"/>.
/// </summary>
[Serializable]
public sealed class GetShardRegionStats : IShardRegionQuery
public sealed class GetShardRegionStats : IShardRegionQuery, IClusterShardingSerializable
{
/// <summary>
/// TBD
Expand Down Expand Up @@ -277,7 +277,7 @@ public CurrentShardRegionState(IImmutableSet<ShardState> shards)
/// Entity allocation statistics for a specific shard region.
/// </summary>
[Serializable]
public sealed class ShardRegionStats
public sealed class ShardRegionStats : IClusterShardingSerializable
{
/// <summary>
/// The set of shardId / entity count pairs
Expand Down

0 comments on commit ad562b4

Please sign in to comment.