Skip to content

Commit

Permalink
ShardStats and ClusterShardingStats serialization fix (#3832)
Browse files Browse the repository at this point in the history
* close #3830 - added serialization support for ClusterShardingStats
  • Loading branch information
Aaronontheweb authored Jul 12, 2019
1 parent 4fd2cb8 commit 04a5eb2
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Sharding.Serialization;
Expand Down Expand Up @@ -122,12 +123,14 @@ public void ClusterShardingMessageSerializer_must_be_able_to_serializable_Persis
public void ClusterShardingMessageSerializer_must_be_able_to_serializable_GetShardStats()
{
CheckSerialization(Shard.GetShardStats.Instance);
CheckSerialization(GetShardRegionStats.Instance);
}

[Fact]
public void ClusterShardingMessageSerializer_must_be_able_to_serializable_ShardStats()
{
CheckSerialization(new Shard.ShardStats("a", 23));
CheckSerialization(new ShardRegionStats(ImmutableDictionary<string, int>.Empty.Add("f", 12)));
}

[Fact]
Expand All @@ -136,5 +139,13 @@ public void ClusterShardingMessageSerializer_must_be_able_to_serialize_StartEnti
CheckSerialization(new ShardRegion.StartEntity("42"));
CheckSerialization(new ShardRegion.StartEntityAck("13", "37"));
}

[Fact]
public void ClusterShardingMessageSerializer_must_serialize_ClusterShardingStats()
{
CheckSerialization(new GetClusterShardingStats(TimeSpan.FromMilliseconds(500)));
CheckSerialization(new ClusterShardingStats(ImmutableDictionary<Address, ShardRegionStats>.Empty.Add(new Address("akka.tcp", "foo", "localhost", 9110),
new ShardRegionStats(ImmutableDictionary<string, int>.Empty.Add("f", 12)))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@
using System.Linq;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Sharding.Serialization.Proto.Msg;
using Akka.Remote.Serialization.Proto.Msg;
using Akka.Serialization;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using ActorRefMessage = Akka.Remote.Serialization.Proto.Msg.ActorRefData;

namespace Akka.Cluster.Sharding.Serialization
{
/// <summary>
/// TBD
/// INTERNAL API: Protobuf serializer of Cluster.Sharding messages.
/// </summary>
public class ClusterShardingMessageSerializer : SerializerWithStringManifest
{
private static readonly byte[] Empty = new byte[0];

#region manifests

private const string CoordinatorStateManifest = "AA";
Expand Down Expand Up @@ -56,6 +61,11 @@ public class ClusterShardingMessageSerializer : SerializerWithStringManifest

private const string GetShardStatsManifest = "DA";
private const string ShardStatsManifest = "DB";
private const string GetShardRegionStatsManifest = "DC";
private const string ShardRegionStatsManifest = "DD";

private const string GetClusterShardingStatsManifest = "GS";
private const string ClusterShardingStatsManifest = "CS";

#endregion

Expand All @@ -69,9 +79,9 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{
_fromBinaryMap = new Dictionary<string, Func<byte[], object>>
{
{EntityStateManifest, EntityStateFromBinary},
{EntityStartedManifest, EntityStartedFromBinary},
{EntityStoppedManifest, EntityStoppedFromBinary},
{EntityStateManifest, bytes => EntityStateFromBinary(bytes) },
{EntityStartedManifest, bytes => EntityStartedFromBinary(bytes) },
{EntityStoppedManifest, bytes => EntityStoppedFromBinary(bytes) },

{CoordinatorStateManifest, CoordinatorStateFromBinary},
{ShardRegionRegisteredManifest, bytes => new PersistentShardCoordinator.ShardRegionRegistered(ActorRefMessageFromBinary(bytes)) },
Expand All @@ -85,7 +95,7 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{RegisterProxyManifest, bytes => new PersistentShardCoordinator.RegisterProxy(ActorRefMessageFromBinary(bytes)) },
{RegisterAckManifest, bytes => new PersistentShardCoordinator.RegisterAck(ActorRefMessageFromBinary(bytes)) },
{GetShardHomeManifest, bytes => new PersistentShardCoordinator.GetShardHome(ShardIdMessageFromBinary(bytes)) },
{ShardHomeManifest, ShardHomeFromBinary},
{ShardHomeManifest, bytes => ShardHomeFromBinary(bytes) },
{HostShardManifest, bytes => new PersistentShardCoordinator.HostShard(ShardIdMessageFromBinary(bytes)) },
{ShardStartedManifest, bytes => new PersistentShardCoordinator.ShardStarted(ShardIdMessageFromBinary(bytes)) },
{BeginHandOffManifest, bytes => new PersistentShardCoordinator.BeginHandOff(ShardIdMessageFromBinary(bytes)) },
Expand All @@ -95,10 +105,14 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{GracefulShutdownReqManifest, bytes => new PersistentShardCoordinator.GracefulShutdownRequest(ActorRefMessageFromBinary(bytes)) },

{GetShardStatsManifest, bytes => Shard.GetShardStats.Instance },
{ShardStatsManifest, ShardStatsFromBinary},

{StartEntityManifest, StartEntityFromBinary },
{StartEntityAckManifest, StartEntityAckFromBinary}
{ShardStatsManifest, bytes => ShardStatsFromBinary(bytes) },
{ GetShardRegionStatsManifest, bytes => GetShardRegionStats.Instance },
{ ShardRegionStatsManifest, bytes => ShardRegionStatsFromBinary(bytes) },
{ GetClusterShardingStatsManifest, bytes => GetClusterShardingStatsFromBinary(bytes) },
{ ClusterShardingStatsManifest, bytes => ClusterShardingStatsFromBinary(bytes) },

{StartEntityManifest, bytes => StartEntityFromBinary(bytes) },
{StartEntityAckManifest, bytes => StartEntityAckFromBinary(bytes) }
};
}

Expand Down Expand Up @@ -138,8 +152,12 @@ public override byte[] ToBinary(object obj)
case Shard.EntityStopped o: return EntityStoppedToProto(o).ToByteArray();
case ShardRegion.StartEntity o: return StartEntityToProto(o).ToByteArray();
case ShardRegion.StartEntityAck o: return StartEntityAckToProto(o).ToByteArray();
case Shard.GetShardStats o: return new byte[0];
case Shard.GetShardStats o: return Empty;
case Shard.ShardStats o: return ShardStatsToProto(o).ToByteArray();
case GetShardRegionStats o: return Empty;
case ShardRegionStats o: return ShardRegionStatsToProto(o).ToByteArray();
case GetClusterShardingStats o: return GetClusterShardingStatsToProto(o).ToByteArray();
case ClusterShardingStats o: return ClusterShardingStatsToProto(o).ToByteArray();
}
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{this.GetType()}]");
}
Expand Down Expand Up @@ -202,6 +220,10 @@ public override string Manifest(object o)
case ShardRegion.StartEntityAck _: return StartEntityAckManifest;
case Shard.GetShardStats _: return GetShardStatsManifest;
case Shard.ShardStats _: return ShardStatsManifest;
case GetShardRegionStats _: return GetShardRegionStatsManifest;
case ShardRegionStats _: return ShardRegionStatsManifest;
case GetClusterShardingStats _: return GetClusterShardingStatsManifest;
case ClusterShardingStats _: return ClusterShardingStatsManifest;
}
throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{this.GetType()}]");
}
Expand Down Expand Up @@ -378,14 +400,14 @@ private IActorRef ActorRefMessageFromBinary(byte[] binary)
// Shard.ShardState
//

private Proto.Msg.EntityState EntityStateToProto(Shard.ShardState entityState)
private static Proto.Msg.EntityState EntityStateToProto(Shard.ShardState entityState)
{
var message = new Proto.Msg.EntityState();
message.Entities.AddRange(entityState.Entries);
return message;
}

private Shard.ShardState EntityStateFromBinary(byte[] bytes)
private static Shard.ShardState EntityStateFromBinary(byte[] bytes)
{
var msg = Proto.Msg.EntityState.Parser.ParseFrom(bytes);
return new Shard.ShardState(msg.Entities.ToImmutableHashSet());
Expand All @@ -401,7 +423,77 @@ private Proto.Msg.ShardIdMessage ShardIdMessageToProto(string shard)
return message;
}

private string ShardIdMessageFromBinary(byte[] bytes)
// ShardRegionStats
private static Proto.Msg.ShardRegionStats ShardRegionStatsToProto(ShardRegionStats s)
{
var message = new Proto.Msg.ShardRegionStats();
message.Stats.Add((IDictionary<string,int>)s.Stats);
return message;
}

private static ShardRegionStats ShardRegionStatsFromBinary(byte[] b)
{
var p = Proto.Msg.ShardRegionStats.Parser.ParseFrom(b);
return new ShardRegionStats(p.Stats.ToImmutableDictionary());
}

// GetClusterShardingStats
private static Proto.Msg.GetClusterShardingStats GetClusterShardingStatsToProto(GetClusterShardingStats stats)
{
var p = new Proto.Msg.GetClusterShardingStats();
p.Timeout = Duration.FromTimeSpan(stats.Timeout);
return p;
}

private static GetClusterShardingStats GetClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.GetClusterShardingStats.Parser.ParseFrom(b);
return new GetClusterShardingStats(p.Timeout.ToTimeSpan());
}

// ClusterShardingStats
private static Proto.Msg.ClusterShardingStats ClusterShardingStatsToProto(ClusterShardingStats stats)
{
var p = new Proto.Msg.ClusterShardingStats();
foreach (var s in stats.Regions)
{
p.Regions.Add(new ShardRegionWithAddress() { NodeAddress = AddressToProto(s.Key), Stats = ShardRegionStatsToProto(s.Value)});
}

return p;
}

private static ClusterShardingStats ClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.ClusterShardingStats.Parser.ParseFrom(b);
var dict = new Dictionary<Address, ShardRegionStats>();
foreach (var s in p.Regions)
{
dict[AddressFrom(s.NodeAddress)] = new ShardRegionStats(s.Stats.Stats.ToImmutableDictionary());
}
return new ClusterShardingStats(dict.ToImmutableDictionary());
}

private static AddressData AddressToProto(Address address)
{
var message = new AddressData();
message.System = address.System;
message.Hostname = address.Host;
message.Port = (uint)(address.Port ?? 0);
message.Protocol = address.Protocol;
return message;
}

private static Address AddressFrom(AddressData addressProto)
{
return new Address(
addressProto.Protocol,
addressProto.System,
addressProto.Hostname,
addressProto.Port == 0 ? null : (int?)addressProto.Port);
}

private static string ShardIdMessageFromBinary(byte[] bytes)
{
return Proto.Msg.ShardIdMessage.Parser.ParseFrom(bytes).Shard;
}
Expand Down
Loading

0 comments on commit 04a5eb2

Please sign in to comment.