Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ShardStats and ClusterShardingStats serialization fix #3832

Merged
merged 13 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Sharding.Serialization;
Expand Down Expand Up @@ -122,12 +123,14 @@ public void ClusterShardingMessageSerializer_must_be_able_to_serializable_Persis
public void ClusterShardingMessageSerializer_must_be_able_to_serializable_GetShardStats()
{
CheckSerialization(Shard.GetShardStats.Instance);
CheckSerialization(GetShardRegionStats.Instance);
}

[Fact]
public void ClusterShardingMessageSerializer_must_be_able_to_serializable_ShardStats()
{
CheckSerialization(new Shard.ShardStats("a", 23));
CheckSerialization(new ShardRegionStats(ImmutableDictionary<string, int>.Empty.Add("f", 12)));
}

[Fact]
Expand All @@ -136,5 +139,13 @@ public void ClusterShardingMessageSerializer_must_be_able_to_serialize_StartEnti
CheckSerialization(new ShardRegion.StartEntity("42"));
CheckSerialization(new ShardRegion.StartEntityAck("13", "37"));
}

[Fact]
public void ClusterShardingMessageSerializer_must_serialize_ClusterShardingStats()
{
CheckSerialization(new GetClusterShardingStats(TimeSpan.FromMilliseconds(500)));
CheckSerialization(new ClusterShardingStats(ImmutableDictionary<Address, ShardRegionStats>.Empty.Add(new Address("akka.tcp", "foo", "localhost", 9110),
new ShardRegionStats(ImmutableDictionary<string, int>.Empty.Add("f", 12)))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
using System.Linq;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Cluster.Sharding.Serialization.Proto.Msg;
using Akka.Remote.Serialization.Proto.Msg;
using Akka.Serialization;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using ActorRefMessage = Akka.Remote.Serialization.Proto.Msg.ActorRefData;

namespace Akka.Cluster.Sharding.Serialization
{
/// <summary>
/// TBD
/// INTERNAL API: Protobuf serializer of Cluster.Sharding messages.
/// </summary>
public class ClusterShardingMessageSerializer : SerializerWithStringManifest
{
Expand Down Expand Up @@ -56,6 +59,11 @@ public class ClusterShardingMessageSerializer : SerializerWithStringManifest

private const string GetShardStatsManifest = "DA";
private const string ShardStatsManifest = "DB";
private const string GetShardRegionStatsManifest = "DC";
private const string ShardRegionStatsManifest = "DD";

private const string GetClusterShardingStatsManifest = "GS";
private const string ClusterShardingStatsManifest = "CS";

#endregion

Expand All @@ -69,9 +77,9 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{
_fromBinaryMap = new Dictionary<string, Func<byte[], object>>
{
{EntityStateManifest, EntityStateFromBinary},
{EntityStartedManifest, EntityStartedFromBinary},
{EntityStoppedManifest, EntityStoppedFromBinary},
{EntityStateManifest, bytes => EntityStateFromBinary(bytes) },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer using method straight away instead of lambda syntax - I don't know it lambda syntax will trigger generation of hidden class representing it, but this way I'm sure I don't give a compiler reasons to think it should try so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing serializer does a mix of both already and these messages aren't sent very often.

{EntityStartedManifest, bytes => EntityStartedFromBinary(bytes) },
{EntityStoppedManifest, bytes => EntityStoppedFromBinary(bytes) },

{CoordinatorStateManifest, CoordinatorStateFromBinary},
{ShardRegionRegisteredManifest, bytes => new PersistentShardCoordinator.ShardRegionRegistered(ActorRefMessageFromBinary(bytes)) },
Expand All @@ -85,7 +93,7 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{RegisterProxyManifest, bytes => new PersistentShardCoordinator.RegisterProxy(ActorRefMessageFromBinary(bytes)) },
{RegisterAckManifest, bytes => new PersistentShardCoordinator.RegisterAck(ActorRefMessageFromBinary(bytes)) },
{GetShardHomeManifest, bytes => new PersistentShardCoordinator.GetShardHome(ShardIdMessageFromBinary(bytes)) },
{ShardHomeManifest, ShardHomeFromBinary},
{ShardHomeManifest, bytes => ShardHomeFromBinary(bytes) },
{HostShardManifest, bytes => new PersistentShardCoordinator.HostShard(ShardIdMessageFromBinary(bytes)) },
{ShardStartedManifest, bytes => new PersistentShardCoordinator.ShardStarted(ShardIdMessageFromBinary(bytes)) },
{BeginHandOffManifest, bytes => new PersistentShardCoordinator.BeginHandOff(ShardIdMessageFromBinary(bytes)) },
Expand All @@ -95,10 +103,14 @@ public ClusterShardingMessageSerializer(ExtendedActorSystem system) : base(syste
{GracefulShutdownReqManifest, bytes => new PersistentShardCoordinator.GracefulShutdownRequest(ActorRefMessageFromBinary(bytes)) },

{GetShardStatsManifest, bytes => Shard.GetShardStats.Instance },
{ShardStatsManifest, ShardStatsFromBinary},

{StartEntityManifest, StartEntityFromBinary },
{StartEntityAckManifest, StartEntityAckFromBinary}
{ShardStatsManifest, bytes => ShardStatsFromBinary(bytes) },
{ GetShardRegionStatsManifest, bytes => GetShardRegionStats.Instance },
{ ShardRegionStatsManifest, bytes => ShardRegionStatsFromBinary(bytes) },
{ GetClusterShardingStatsManifest, bytes => GetClusterShardingStatsFromBinary(bytes) },
{ ClusterShardingStatsManifest, bytes => ClusterShardingStatsFromBinary(bytes) },

{StartEntityManifest, bytes => StartEntityFromBinary(bytes) },
{StartEntityAckManifest, bytes => StartEntityAckFromBinary(bytes) }
};
}

Expand Down Expand Up @@ -140,6 +152,10 @@ public override byte[] ToBinary(object obj)
case ShardRegion.StartEntityAck o: return StartEntityAckToProto(o).ToByteArray();
case Shard.GetShardStats o: return new byte[0];
case Shard.ShardStats o: return ShardStatsToProto(o).ToByteArray();
case GetShardRegionStats o: return new byte[0];
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
case ShardRegionStats o: return ShardRegionStatsToProto(o).ToByteArray();
case GetClusterShardingStats o: return GetClusterShardingStatsToProto(o).ToByteArray();
case ClusterShardingStats o: return ClusterShardingStatsToProto(o).ToByteArray();
}
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{this.GetType()}]");
}
Expand Down Expand Up @@ -202,6 +218,10 @@ public override string Manifest(object o)
case ShardRegion.StartEntityAck _: return StartEntityAckManifest;
case Shard.GetShardStats _: return GetShardStatsManifest;
case Shard.ShardStats _: return ShardStatsManifest;
case GetShardRegionStats _: return GetShardRegionStatsManifest;
case ShardRegionStats _: return ShardRegionStatsManifest;
case GetClusterShardingStats _: return GetClusterShardingStatsManifest;
case ClusterShardingStats _: return ClusterShardingStatsManifest;
}
throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{this.GetType()}]");
}
Expand Down Expand Up @@ -378,14 +398,14 @@ private IActorRef ActorRefMessageFromBinary(byte[] binary)
// Shard.ShardState
//

private Proto.Msg.EntityState EntityStateToProto(Shard.ShardState entityState)
private static Proto.Msg.EntityState EntityStateToProto(Shard.ShardState entityState)
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
var message = new Proto.Msg.EntityState();
message.Entities.AddRange(entityState.Entries);
return message;
}

private Shard.ShardState EntityStateFromBinary(byte[] bytes)
private static Shard.ShardState EntityStateFromBinary(byte[] bytes)
{
var msg = Proto.Msg.EntityState.Parser.ParseFrom(bytes);
return new Shard.ShardState(msg.Entities.ToImmutableHashSet());
Expand All @@ -401,7 +421,77 @@ private Proto.Msg.ShardIdMessage ShardIdMessageToProto(string shard)
return message;
}

private string ShardIdMessageFromBinary(byte[] bytes)
// ShardRegionStats
private static Proto.Msg.ShardRegionStats ShardRegionStatsToProto(ShardRegionStats s)
{
var message = new Proto.Msg.ShardRegionStats();
message.Stats.Add((IDictionary<string,int>)s.Stats);
return message;
}

private static ShardRegionStats ShardRegionStatsFromBinary(byte[] b)
{
var p = Proto.Msg.ShardRegionStats.Parser.ParseFrom(b);
return new ShardRegionStats(p.Stats.ToImmutableDictionary());
}

// GetClusterShardingStats
private static Proto.Msg.GetClusterShardingStats GetClusterShardingStatsToProto(GetClusterShardingStats stats)
{
var p = new Proto.Msg.GetClusterShardingStats();
p.Timeout = Duration.FromTimeSpan(stats.Timeout);
return p;
}

private static GetClusterShardingStats GetClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.GetClusterShardingStats.Parser.ParseFrom(b);
return new GetClusterShardingStats(p.Timeout.ToTimeSpan());
}

// ClusterShardingStats
private static Proto.Msg.ClusterShardingStats ClusterShardingStatsToProto(ClusterShardingStats stats)
{
var p = new Proto.Msg.ClusterShardingStats();
foreach (var s in stats.Regions)
{
p.Regions.Add(new ShardRegionWithAddress() { NodeAddress = AddressToProto(s.Key), Stats = ShardRegionStatsToProto(s.Value)});
}

return p;
}

private static ClusterShardingStats ClusterShardingStatsFromBinary(byte[] b)
{
var p = Proto.Msg.ClusterShardingStats.Parser.ParseFrom(b);
var dict = new Dictionary<Address, ShardRegionStats>();
foreach (var s in p.Regions)
{
dict[AddressFrom(s.NodeAddress)] = new ShardRegionStats(s.Stats.Stats.ToImmutableDictionary());
}
return new ClusterShardingStats(dict.ToImmutableDictionary());
}

private static AddressData AddressToProto(Address address)
{
var message = new AddressData();
message.System = address.System;
message.Hostname = address.Host;
message.Port = (uint)(address.Port ?? 0);
message.Protocol = address.Protocol;
return message;
}

private static Address AddressFrom(AddressData addressProto)
{
return new Address(
addressProto.Protocol,
addressProto.System,
addressProto.Hostname,
addressProto.Port == 0 ? null : (int?)addressProto.Port);
}

private static string ShardIdMessageFromBinary(byte[] bytes)
{
return Proto.Msg.ShardIdMessage.Parser.ParseFrom(bytes).Shard;
}
Expand Down
Loading