Skip to content

Commit

Permalink
Separate Metrics business and wire format models (#7067)
Browse files Browse the repository at this point in the history
* Separate Metrics business and wire format models

* Move protobuf generated file to internal

* Update API approval list

* Fix API approval list

* Fix equality and comparator errors

* Update API approval
  • Loading branch information
Arkatufus authored Jan 19, 2024
1 parent 284d33a commit 0cb2881
Show file tree
Hide file tree
Showing 11 changed files with 1,114 additions and 602 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ public interface IClusterMetricMessage { }
/// Envelope adding a sender address to the cluster metrics gossip.
/// </summary>
[InternalApi]
public sealed partial class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
public sealed class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
{
/// <summary>
/// Akka's actor address
/// </summary>
public Actor.Address FromAddress { get; }

public MetricsGossip Gossip { get; }

public bool Reply { get; }

/// <summary>
/// Creates new instance of <see cref="MetricsGossipEnvelope"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
Expand All @@ -15,7 +14,6 @@
using Akka.Serialization;
using Akka.Util;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

namespace Akka.Cluster.Metrics.Serialization
{
Expand Down Expand Up @@ -130,7 +128,7 @@ public override string Manifest(object o)

private byte[] AdaptiveLoadBalancingPoolToBinary(Metrics.AdaptiveLoadBalancingPool pool)
{
var proto = new AdaptiveLoadBalancingPool()
var proto = new Proto.AdaptiveLoadBalancingPool()
{
NrOfInstances = (uint)pool.NrOfInstances,
UsePoolDispatcher = pool.UsePoolDispatcher
Expand All @@ -145,11 +143,11 @@ private byte[] AdaptiveLoadBalancingPoolToBinary(Metrics.AdaptiveLoadBalancingPo
return proto.ToByteArray();
}

private MetricsSelector MetricsSelectorToProto(IMetricsSelector selector)
private Proto.MetricsSelector MetricsSelectorToProto(IMetricsSelector selector)
{
var serializer = _serialization.Value.FindSerializerFor(selector);

return new MetricsSelector()
return new Proto.MetricsSelector()
{
Data = ByteString.CopyFrom(serializer.ToBinary(selector)),
SerializerId = (uint)serializer.Identifier,
Expand All @@ -159,7 +157,7 @@ private MetricsSelector MetricsSelectorToProto(IMetricsSelector selector)

private byte[] MixMetricsSelectorToBinary(Metrics.MixMetricsSelector selector)
{
var proto = new MixMetricsSelector()
var proto = new Proto.MixMetricsSelector()
{
Selectors = { selector.Selectors.Select(MetricsSelectorToProto) }
};
Expand All @@ -169,9 +167,9 @@ private byte[] MixMetricsSelectorToBinary(Metrics.MixMetricsSelector selector)
/// <summary>
/// Converts Akka.NET type into Protobuf serializable message
/// </summary>
private AddressData AddressToProto(Actor.Address address)
private Proto.AddressData AddressToProto(Actor.Address address)
{
return new AddressData()
return new Proto.AddressData()
{
Hostname = address.Host,
Protocol = address.Protocol,
Expand All @@ -185,7 +183,7 @@ private AddressData AddressToProto(Actor.Address address)
/// </summary>
/// <param name="address"></param>
/// <returns></returns>
private Akka.Actor.Address AddressFromProto(AddressData address)
private Akka.Actor.Address AddressFromProto(Proto.AddressData address)
{
return new Akka.Actor.Address(address.Protocol, address.System, address.Hostname, (int)address.Port);
}
Expand All @@ -200,10 +198,10 @@ private int MapWithErrorMessage<T>(IImmutableDictionary<T, int> dict, T value, s

private MetricsGossipEnvelope MetricsGossipEnvelopeFromBinary(byte[] bytes)
{
return MetricsGossipEnvelopeFromProto(MetricsGossipEnvelope.Parser.ParseFrom(Decompress(bytes)));
return MetricsGossipEnvelopeFromProto(Proto.MetricsGossipEnvelope.Parser.ParseFrom(Decompress(bytes)));
}

private MetricsGossipEnvelope MetricsGossipEnvelopeToProto(MetricsGossipEnvelope envelope)
private Proto.MetricsGossipEnvelope MetricsGossipEnvelopeToProto(MetricsGossipEnvelope envelope)
{
var allNodeMetrics = envelope.Gossip.Nodes;
var allAddresses = allNodeMetrics.Select(m => m.Address).ToImmutableArray();
Expand All @@ -216,28 +214,32 @@ private MetricsGossipEnvelope MetricsGossipEnvelopeToProto(MetricsGossipEnvelope
int MapAddress(Actor.Address address) => MapWithErrorMessage(addressMapping, address, "address");
int MapName(string name) => MapWithErrorMessage(metricNamesMapping, name, "metric name");

Option<NodeMetrics.Types.EWMA> EwmaToProto(Option<NodeMetrics.Types.EWMA> ewma)
=> ewma.Select(e => new NodeMetrics.Types.EWMA(e.Value, e.Alpha));
Option<Proto.NodeMetrics.Types.EWMA> EwmaToProto(Option<NodeMetrics.Types.EWMA> ewma)
=> ewma.Select(e => new Proto.NodeMetrics.Types.EWMA
{
Value = e.Value,
Alpha = e.Alpha
});

NodeMetrics.Types.Number NumberToProto(AnyNumber number)
Proto.NodeMetrics.Types.Number NumberToProto(AnyNumber number)
{
var proto = new NodeMetrics.Types.Number();
var proto = new Proto.NodeMetrics.Types.Number();
switch (number.Type)
{
case AnyNumber.NumberType.Int:
proto.Type = NodeMetrics.Types.NumberType.Integer;
proto.Type = Proto.NodeMetrics.Types.NumberType.Integer;
proto.Value32 = Convert.ToUInt32(number.LongValue);
break;
case AnyNumber.NumberType.Long:
proto.Type = NodeMetrics.Types.NumberType.Long;
proto.Type = Proto.NodeMetrics.Types.NumberType.Long;
proto.Value64 = Convert.ToUInt64(number.LongValue);
break;
case AnyNumber.NumberType.Float:
proto.Type = NodeMetrics.Types.NumberType.Float;
proto.Type = Proto.NodeMetrics.Types.NumberType.Float;
proto.Value32 = (uint)BitConverter.ToInt32(BitConverter.GetBytes((float)number.DoubleValue), 0);
break;
case AnyNumber.NumberType.Double:
proto.Type = NodeMetrics.Types.NumberType.Double;
proto.Type = Proto.NodeMetrics.Types.NumberType.Double;
proto.Value64 = (ulong)BitConverter.DoubleToInt64Bits(number.DoubleValue);
break;
default:
Expand All @@ -247,9 +249,9 @@ NodeMetrics.Types.Number NumberToProto(AnyNumber number)
return proto;
}

NodeMetrics.Types.Metric MetricToProto(NodeMetrics.Types.Metric m)
Proto.NodeMetrics.Types.Metric MetricToProto(NodeMetrics.Types.Metric m)
{
var metric = new NodeMetrics.Types.Metric()
var metric = new Proto.NodeMetrics.Types.Metric()
{
NameIndex = MapName(m.Name),
Number = NumberToProto(m.Value),
Expand All @@ -262,9 +264,9 @@ NodeMetrics.Types.Metric MetricToProto(NodeMetrics.Types.Metric m)
return metric;
}

NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)
Proto.NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)
{
return new NodeMetrics()
return new Proto.NodeMetrics()
{
AddressIndex = MapAddress(nodeMetrics.Address),
Timestamp = nodeMetrics.Timestamp,
Expand All @@ -274,11 +276,11 @@ NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)

var nodeMetricsProto = allNodeMetrics.Select(NodeMetricsToProto);

return new MetricsGossipEnvelope()
return new Proto.MetricsGossipEnvelope()
{
From = AddressToProto(envelope.FromAddress),
Reply = envelope.Reply,
Gossip = new MetricsGossip()
Gossip = new Proto.MetricsGossip()
{
AllAddresses = { allAddresses.Select(AddressToProto) },
AllMetricNames = { allMetricNames },
Expand All @@ -287,50 +289,50 @@ NodeMetrics NodeMetricsToProto(NodeMetrics nodeMetrics)
};
}

private MetricsGossipEnvelope MetricsGossipEnvelopeFromProto(MetricsGossipEnvelope envelope)
private MetricsGossipEnvelope MetricsGossipEnvelopeFromProto(Proto.MetricsGossipEnvelope envelope)
{
var gossip = envelope.Gossip;
var addressMapping = gossip.AllAddresses.Select(AddressFromProto).ToImmutableArray();
var metricNameMapping = gossip.AllMetricNames.ToImmutableArray();

Option<NodeMetrics.Types.EWMA> EwmaFromProto(NodeMetrics.Types.EWMA ewma)
Option<NodeMetrics.Types.EWMA> EwmaFromProto(Proto.NodeMetrics.Types.EWMA ewma)
=> new NodeMetrics.Types.EWMA(ewma.Value, ewma.Alpha);

AnyNumber NumberFromProto(NodeMetrics.Types.Number number)
AnyNumber NumberFromProto(Proto.NodeMetrics.Types.Number number)
{
switch (number.Type)
{
case NodeMetrics.Types.NumberType.Double:
case Proto.NodeMetrics.Types.NumberType.Double:
return BitConverter.Int64BitsToDouble((long)number.Value64);
case NodeMetrics.Types.NumberType.Float:
case Proto.NodeMetrics.Types.NumberType.Float:
return BitConverter.ToSingle(BitConverter.GetBytes((int)number.Value32), 0);
case NodeMetrics.Types.NumberType.Integer:
case Proto.NodeMetrics.Types.NumberType.Integer:
return Convert.ToInt32(number.Value32);
case NodeMetrics.Types.NumberType.Long:
case Proto.NodeMetrics.Types.NumberType.Long:
return Convert.ToInt64(number.Value64);
case NodeMetrics.Types.NumberType.Serialized:
case Proto.NodeMetrics.Types.NumberType.Serialized:
// TODO: Should we somehow port this?
/*val in = new ClassLoaderObjectInputStream(
system.dynamicAccess.classLoader,
new ByteArrayInputStream(number.getSerialized.toByteArray))
val obj = in.readObject
in.close()
obj.asInstanceOf[jl.Number]*/
throw new NotImplementedException($"{NodeMetrics.Types.NumberType.Serialized} number type is not supported");
throw new NotImplementedException($"{Proto.NodeMetrics.Types.NumberType.Serialized} number type is not supported");
default:
throw new ArgumentOutOfRangeException(nameof(number));
}
}

NodeMetrics.Types.Metric MetricFromProto(NodeMetrics.Types.Metric metric)
NodeMetrics.Types.Metric MetricFromProto(Proto.NodeMetrics.Types.Metric metric)
{
return new NodeMetrics.Types.Metric(
metricNameMapping[metric.NameIndex],
NumberFromProto(metric.Number),
metric.Ewma != null ? EwmaFromProto(metric.Ewma) : Option<NodeMetrics.Types.EWMA>.None);
}

NodeMetrics NodeMetricsFromProto(NodeMetrics metrics)
NodeMetrics NodeMetricsFromProto(Proto.NodeMetrics metrics)
{
return new NodeMetrics(
addressMapping[metrics.AddressIndex],
Expand All @@ -345,7 +347,7 @@ NodeMetrics NodeMetricsFromProto(NodeMetrics metrics)

private Metrics.AdaptiveLoadBalancingPool AdaptiveLoadBalancingPoolFromBinary(byte[] bytes)
{
var proto = AdaptiveLoadBalancingPool.Parser.ParseFrom(bytes);
var proto = Proto.AdaptiveLoadBalancingPool.Parser.ParseFrom(bytes);

IMetricsSelector selector;
if (proto.MetricsSelector != null)
Expand All @@ -368,15 +370,15 @@ private Metrics.AdaptiveLoadBalancingPool AdaptiveLoadBalancingPoolFromBinary(by

private Metrics.MixMetricsSelector MixMetricSelectorFromBinary(byte[] bytes)
{
var proto = MixMetricsSelector.Parser.ParseFrom(bytes);
var proto = Proto.MixMetricsSelector.Parser.ParseFrom(bytes);
return new Metrics.MixMetricsSelector(proto.Selectors.Select(s =>
{
// should be safe because we serialized only the right subtypes of MetricsSelector
return MetricSelectorFromProto(s) as CapacityMetricsSelector;
}).ToImmutableArray());
}

private IMetricsSelector MetricSelectorFromProto(Serialization.MetricsSelector selector)
private IMetricsSelector MetricSelectorFromProto(Serialization.Proto.MetricsSelector selector)
{
return _serialization.Value.Deserialize(selector.Data.ToByteArray(), (int)selector.SerializerId, selector.Manifest) as IMetricsSelector;
}
Expand Down
37 changes: 32 additions & 5 deletions src/contrib/cluster/Akka.Cluster.Metrics/Serialization/EWMA.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public static partial class Types
/// the sampled value resulting from the previous smoothing iteration.
/// This value is always used as the previous EWMA to calculate the new EWMA.
/// </summary>
public sealed partial class EWMA
public sealed class EWMA: IEquatable<EWMA>
{
public double Value { get; }

public double Alpha { get; }

/// <summary>
/// Creates new instance of <see cref="EWMA"/>
/// </summary>
Expand All @@ -47,10 +51,10 @@ public sealed partial class EWMA
public EWMA(double value, double alpha)
{
if (alpha is < 0 or > 1)
throw new ArgumentException(nameof(alpha), "alpha must be between 0.0 and 1.0");
throw new ArgumentException("alpha must be between 0.0 and 1.0", nameof(alpha));

value_ = value;
alpha_ = alpha;
Value = value;
Alpha = alpha;
}

/// <summary>
Expand Down Expand Up @@ -83,11 +87,34 @@ public static double GetAlpha(TimeSpan halfLife, TimeSpan collectInterval)

var halfLifeMillis = halfLife.TotalMilliseconds;
if (halfLifeMillis <= 0)
throw new ArgumentException(nameof(halfLife), "halfLife must be > 0 s");
throw new ArgumentException("halfLife must be > 0 s", nameof(halfLife));

var decayRate = logOf2 / halfLifeMillis;
return 1 - Math.Exp(-decayRate * collectInterval.TotalMilliseconds);
}

public bool Equals(EWMA other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Value.Equals(other.Value) && Alpha.Equals(other.Alpha);
}

public override bool Equals(object obj)
{
return ReferenceEquals(this, obj) || obj is EWMA other && Equals(other);
}

public override int GetHashCode()
{
unchecked
{
var hash = 17;
hash = hash * 23 + Value.GetHashCode();
hash = hash * 23 + Alpha.GetHashCode();
return hash;
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
//-----------------------------------------------------------------------

using System;
using System.ComponentModel;
using Akka.Annotations;
using Akka.Cluster.Metrics.Helpers;
using Akka.Dispatch.SysMsg;
using Akka.Util;

#nullable enable
namespace Akka.Cluster.Metrics.Serialization
{
public sealed partial class NodeMetrics
Expand All @@ -23,7 +22,7 @@ public static partial class Types
///
/// Equality of Metric is based on its name index.
/// </summary>
public sealed partial class Metric
public sealed class Metric: IEquatable<Metric>
{
/// <summary>
/// Metric average value
Expand Down Expand Up @@ -79,7 +78,6 @@ public Metric(string name, AnyNumber value, Option<EWMA> average)
Name = name;
Value = value;
Average = average;
ewma_ = average.HasValue ? average.Value : default(EWMA);
}

/// <summary>
Expand Down Expand Up @@ -181,7 +179,7 @@ public bool Equals(Metric other)

public override int GetHashCode()
{
return (Name != null ? Name.GetHashCode() : 0);
return Name.GetHashCode();
}
}
}
Expand Down
Loading

0 comments on commit 0cb2881

Please sign in to comment.