Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Metrics business and wire format models #7067

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ public interface IClusterMetricMessage { }
/// Envelope adding a sender address to the cluster metrics gossip.
/// </summary>
[InternalApi]
public sealed partial class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
public sealed class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
{
/// <summary>
/// Akka's actor address
/// </summary>
public Actor.Address FromAddress { get; }

public MetricsGossip Gossip { get; }

public bool Reply { get; }

Comment on lines +35 to +38
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added missing fields that, before, came from the protobuf generated code

/// <summary>
/// Creates new instance of <see cref="MetricsGossipEnvelope"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
Expand All @@ -15,7 +14,6 @@
using Akka.Serialization;
using Akka.Util;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

namespace Akka.Cluster.Metrics.Serialization
{
Expand Down Expand Up @@ -130,7 +128,7 @@ public override string Manifest(object o)

private byte[] AdaptiveLoadBalancingPoolToBinary(Metrics.AdaptiveLoadBalancingPool pool)
{
var proto = new AdaptiveLoadBalancingPool()
var proto = new Proto.AdaptiveLoadBalancingPool()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the protobuf generated codes to a new namespace, hence the change.

{
NrOfInstances = (uint)pool.NrOfInstances,
UsePoolDispatcher = pool.UsePoolDispatcher
Expand All @@ -145,11 +143,11 @@ private byte[] AdaptiveLoadBalancingPoolToBinary(Metrics.AdaptiveLoadBalancingPo
return proto.ToByteArray();
}

private MetricsSelector MetricsSelectorToProto(IMetricsSelector selector)
private Proto.MetricsSelector MetricsSelectorToProto(IMetricsSelector selector)
{
var serializer = _serialization.Value.FindSerializerFor(selector);

return new MetricsSelector()
return new Proto.MetricsSelector()
{
Data = ByteString.CopyFrom(serializer.ToBinary(selector)),
SerializerId = (uint)serializer.Identifier,
Expand All @@ -159,7 +157,7 @@ private MetricsSelector MetricsSelectorToProto(IMetricsSelector selector)

private byte[] MixMetricsSelectorToBinary(Metrics.MixMetricsSelector selector)
{
var proto = new MixMetricsSelector()
var proto = new Proto.MixMetricsSelector()
{
Selectors = { selector.Selectors.Select(MetricsSelectorToProto) }
};
Expand All @@ -169,9 +167,9 @@ private byte[] MixMetricsSelectorToBinary(Metrics.MixMetricsSelector selector)
/// <summary>
/// Converts Akka.NET type into Protobuf serializable message
/// </summary>
private AddressData AddressToProto(Actor.Address address)
private Proto.AddressData AddressToProto(Actor.Address address)
{
return new AddressData()
return new Proto.AddressData()
{
Hostname = address.Host,
Protocol = address.Protocol,
Expand All @@ -185,7 +183,7 @@ private AddressData AddressToProto(Actor.Address address)
/// </summary>
/// <param name="address"></param>
/// <returns></returns>
private Akka.Actor.Address AddressFromProto(AddressData address)
private Akka.Actor.Address AddressFromProto(Proto.AddressData address)
{
return new Akka.Actor.Address(address.Protocol, address.System, address.Hostname, (int)address.Port);
}
Expand All @@ -200,10 +198,10 @@ private int MapWithErrorMessage<T>(IImmutableDictionary<T, int> dict, T value, s

private MetricsGossipEnvelope MetricsGossipEnvelopeFromBinary(byte[] bytes)
{
return MetricsGossipEnvelopeFromProto(MetricsGossipEnvelope.Parser.ParseFrom(Decompress(bytes)));
return MetricsGossipEnvelopeFromProto(Proto.MetricsGossipEnvelope.Parser.ParseFrom(Decompress(bytes)));
}

private MetricsGossipEnvelope MetricsGossipEnvelopeToProto(MetricsGossipEnvelope envelope)
private Proto.MetricsGossipEnvelope MetricsGossipEnvelopeToProto(MetricsGossipEnvelope envelope)
{
var allNodeMetrics = envelope.Gossip.Nodes;
var allAddresses = allNodeMetrics.Select(m => m.Address).ToImmutableArray();
Expand All @@ -216,28 +214,32 @@ private MetricsGossipEnvelope MetricsGossipEnvelopeToProto(MetricsGossipEnvelope
int MapAddress(Actor.Address address) => MapWithErrorMessage(addressMapping, address, "address");
int MapName(string name) => MapWithErrorMessage(metricNamesMapping, name, "metric name");

Option<NodeMetrics.Types.EWMA> EwmaToProto(Option<NodeMetrics.Types.EWMA> ewma)
=> ewma.Select(e => new NodeMetrics.Types.EWMA(e.Value, e.Alpha));
Option<Proto.NodeMetrics.Types.EWMA> EwmaToProto(Option<NodeMetrics.Types.EWMA> ewma)
=> ewma.Select(e => new Proto.NodeMetrics.Types.EWMA
{
Value = e.Value,
Alpha = e.Alpha
});

NodeMetrics.Types.Number NumberToProto(AnyNumber number)
Proto.NodeMetrics.Types.Number NumberToProto(AnyNumber number)
{
var proto = new NodeMetrics.Types.Number();
var proto = new Proto.NodeMetrics.Types.Number();
switch (number.Type)
{
case AnyNumber.NumberType.Int:
proto.Type = NodeMetrics.Types.NumberType.Integer;
proto.Type = Proto.NodeMetrics.Types.NumberType.Integer;
proto.Value32 = Convert.ToUInt32(number.LongValue);
break;
case AnyNumber.NumberType.Long:
proto.Type = NodeMetrics.Types.NumberType.Long;
proto.Type = Proto.NodeMetrics.Types.NumberType.Long;
proto.Value64 = Convert.ToUInt64(number.LongValue);
break;
case AnyNumber.NumberType.Float:
proto.Type = NodeMetrics.Types.NumberType.Float;
proto.Type = Proto.NodeMetrics.Types.NumberType.Float;
proto.Value32 = (uint)BitConverter.ToInt32(BitConverter.GetBytes((float)number.DoubleValue), 0);
break;
case AnyNumber.NumberType.Double:
proto.Type = NodeMetrics.Types.NumberType.Double;
proto.Type = Proto.NodeMetrics.Types.NumberType.Double;
proto.Value64 = (ulong)BitConverter.DoubleToInt64Bits(number.DoubleValue);
break;
default:
Expand All @@ -247,9 +249,9 @@ NodeMetrics.Types.Number NumberToProto(AnyNumber number)
return proto;
}

NodeMetrics.Types.Metric MetricToProto(NodeMetrics.Types.Metric m)
Proto.NodeMetrics.Types.Metric MetricToProto(NodeMetrics.Types.Metric m)
{
var metric = new NodeMetrics.Types.Metric()
var metric = new Proto.NodeMetrics.Types.Metric()
{
NameIndex = MapName(m.Name),
Number = NumberToProto(m.Value),
Expand All @@ -262,9 +264,9 @@ NodeMetrics.Types.Metric MetricToProto(NodeMetrics.Types.Metric m)
return metric;
}

NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)
Proto.NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)
{
return new NodeMetrics()
return new Proto.NodeMetrics()
{
AddressIndex = MapAddress(nodeMetrics.Address),
Timestamp = nodeMetrics.Timestamp,
Expand All @@ -274,11 +276,11 @@ NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)

var nodeMetricsProto = allNodeMetrics.Select(NodeMetricsToProto);

return new MetricsGossipEnvelope()
return new Proto.MetricsGossipEnvelope()
{
From = AddressToProto(envelope.FromAddress),
Reply = envelope.Reply,
Gossip = new MetricsGossip()
Gossip = new Proto.MetricsGossip()
{
AllAddresses = { allAddresses.Select(AddressToProto) },
AllMetricNames = { allMetricNames },
Expand All @@ -287,50 +289,50 @@ NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)
};
}

private MetricsGossipEnvelope MetricsGossipEnvelopeFromProto(MetricsGossipEnvelope envelope)
private MetricsGossipEnvelope MetricsGossipEnvelopeFromProto(Proto.MetricsGossipEnvelope envelope)
{
var gossip = envelope.Gossip;
var addressMapping = gossip.AllAddresses.Select(AddressFromProto).ToImmutableArray();
var metricNameMapping = gossip.AllMetricNames.ToImmutableArray();

Option<NodeMetrics.Types.EWMA> EwmaFromProto(NodeMetrics.Types.EWMA ewma)
Option<NodeMetrics.Types.EWMA> EwmaFromProto(Proto.NodeMetrics.Types.EWMA ewma)
=> new NodeMetrics.Types.EWMA(ewma.Value, ewma.Alpha);

AnyNumber NumberFromProto(NodeMetrics.Types.Number number)
AnyNumber NumberFromProto(Proto.NodeMetrics.Types.Number number)
{
switch (number.Type)
{
case NodeMetrics.Types.NumberType.Double:
case Proto.NodeMetrics.Types.NumberType.Double:
return BitConverter.Int64BitsToDouble((long)number.Value64);
case NodeMetrics.Types.NumberType.Float:
case Proto.NodeMetrics.Types.NumberType.Float:
return BitConverter.ToSingle(BitConverter.GetBytes((int)number.Value32), 0);
case NodeMetrics.Types.NumberType.Integer:
case Proto.NodeMetrics.Types.NumberType.Integer:
return Convert.ToInt32(number.Value32);
case NodeMetrics.Types.NumberType.Long:
case Proto.NodeMetrics.Types.NumberType.Long:
return Convert.ToInt64(number.Value64);
case NodeMetrics.Types.NumberType.Serialized:
case Proto.NodeMetrics.Types.NumberType.Serialized:
// TODO: Should we somehow port this?
/*val in = new ClassLoaderObjectInputStream(
system.dynamicAccess.classLoader,
new ByteArrayInputStream(number.getSerialized.toByteArray))
val obj = in.readObject
in.close()
obj.asInstanceOf[jl.Number]*/
throw new NotImplementedException($"{NodeMetrics.Types.NumberType.Serialized} number type is not supported");
throw new NotImplementedException($"{Proto.NodeMetrics.Types.NumberType.Serialized} number type is not supported");
default:
throw new ArgumentOutOfRangeException(nameof(number));
}
}

NodeMetrics.Types.Metric MetricFromProto(NodeMetrics.Types.Metric metric)
NodeMetrics.Types.Metric MetricFromProto(Proto.NodeMetrics.Types.Metric metric)
{
return new NodeMetrics.Types.Metric(
metricNameMapping[metric.NameIndex],
NumberFromProto(metric.Number),
metric.Ewma != null ? EwmaFromProto(metric.Ewma) : Option<NodeMetrics.Types.EWMA>.None);
}

NodeMetrics NodeMetricsFromProto(NodeMetrics metrics)
NodeMetrics NodeMetricsFromProto(Proto.NodeMetrics metrics)
{
return new NodeMetrics(
addressMapping[metrics.AddressIndex],
Expand All @@ -345,7 +347,7 @@ NodeMetrics NodeMetricsFromProto(NodeMetrics metrics)

private Metrics.AdaptiveLoadBalancingPool AdaptiveLoadBalancingPoolFromBinary(byte[] bytes)
{
var proto = AdaptiveLoadBalancingPool.Parser.ParseFrom(bytes);
var proto = Proto.AdaptiveLoadBalancingPool.Parser.ParseFrom(bytes);

IMetricsSelector selector;
if (proto.MetricsSelector != null)
Expand All @@ -368,15 +370,15 @@ private Metrics.AdaptiveLoadBalancingPool AdaptiveLoadBalancingPoolFromBinary(by

private Metrics.MixMetricsSelector MixMetricSelectorFromBinary(byte[] bytes)
{
var proto = MixMetricsSelector.Parser.ParseFrom(bytes);
var proto = Proto.MixMetricsSelector.Parser.ParseFrom(bytes);
return new Metrics.MixMetricsSelector(proto.Selectors.Select(s =>
{
// should be safe because we serialized only the right subtypes of MetricsSelector
return MetricSelectorFromProto(s) as CapacityMetricsSelector;
}).ToImmutableArray());
}

private IMetricsSelector MetricSelectorFromProto(Serialization.MetricsSelector selector)
private IMetricsSelector MetricSelectorFromProto(Serialization.Proto.MetricsSelector selector)
{
return _serialization.Value.Deserialize(selector.Data.ToByteArray(), (int)selector.SerializerId, selector.Manifest) as IMetricsSelector;
}
Expand Down
37 changes: 32 additions & 5 deletions src/contrib/cluster/Akka.Cluster.Metrics/Serialization/EWMA.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public static partial class Types
/// the sampled value resulting from the previous smoothing iteration.
/// This value is always used as the previous EWMA to calculate the new EWMA.
/// </summary>
public sealed partial class EWMA
public sealed class EWMA: IEquatable<EWMA>
{
public double Value { get; }

public double Alpha { get; }

Comment on lines +34 to +39
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added equality code and missing fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in the old code, IEquatable was implemented by the protobuf generated code. We need to add this back.

/// <summary>
/// Creates new instance of <see cref="EWMA"/>
/// </summary>
Expand All @@ -47,10 +51,10 @@ public sealed partial class EWMA
public EWMA(double value, double alpha)
{
if (alpha is < 0 or > 1)
throw new ArgumentException(nameof(alpha), "alpha must be between 0.0 and 1.0");
throw new ArgumentException("alpha must be between 0.0 and 1.0", nameof(alpha));

value_ = value;
alpha_ = alpha;
Value = value;
Alpha = alpha;
}

/// <summary>
Expand Down Expand Up @@ -83,11 +87,34 @@ public static double GetAlpha(TimeSpan halfLife, TimeSpan collectInterval)

var halfLifeMillis = halfLife.TotalMilliseconds;
if (halfLifeMillis <= 0)
throw new ArgumentException(nameof(halfLife), "halfLife must be > 0 s");
throw new ArgumentException("halfLife must be > 0 s", nameof(halfLife));

var decayRate = logOf2 / halfLifeMillis;
return 1 - Math.Exp(-decayRate * collectInterval.TotalMilliseconds);
}

public bool Equals(EWMA other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Value.Equals(other.Value) && Alpha.Equals(other.Alpha);
}

public override bool Equals(object obj)
{
return ReferenceEquals(this, obj) || obj is EWMA other && Equals(other);
}

public override int GetHashCode()
{
unchecked
{
var hash = 17;
hash = hash * 23 + Value.GetHashCode();
hash = hash * 23 + Alpha.GetHashCode();
return hash;
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
//-----------------------------------------------------------------------

using System;
using System.ComponentModel;
using Akka.Annotations;
using Akka.Cluster.Metrics.Helpers;
using Akka.Dispatch.SysMsg;
using Akka.Util;

#nullable enable
namespace Akka.Cluster.Metrics.Serialization
{
public sealed partial class NodeMetrics
Expand All @@ -23,7 +22,7 @@ public static partial class Types
///
/// Equality of Metric is based on its name index.
/// </summary>
public sealed partial class Metric
public sealed class Metric: IEquatable<Metric>
{
/// <summary>
/// Metric average value
Expand Down Expand Up @@ -79,7 +78,6 @@ public Metric(string name, AnyNumber value, Option<EWMA> average)
Name = name;
Value = value;
Average = average;
ewma_ = average.HasValue ? average.Value : default(EWMA);
}

/// <summary>
Expand Down Expand Up @@ -181,7 +179,7 @@ public bool Equals(Metric other)

public override int GetHashCode()
{
return (Name != null ? Name.GetHashCode() : 0);
return Name.GetHashCode();
}
}
}
Expand Down
Loading