Skip to content

Commit

Permalink
[otlp] OTLP Exporter Custom serializer - (Part 3) Outstanding TODOs (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#5975)

Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
  • Loading branch information
TimothyMothra and CodeBlanch authored Nov 14, 2024
1 parent 74fc70e commit b201d70
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ internal static class ProtobufOtlpMetricSerializer
private static readonly Stack<List<Metric>> MetricListPool = [];
private static readonly Dictionary<string, List<Metric>> ScopeMetricsList = [];

private delegate int WriteExemplarFunc(byte[] buffer, int writePosition, in Exemplar exemplar);

internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources.Resource? resource, in Batch<Metric> batch)
{
foreach (var metric in batch)
Expand Down Expand Up @@ -94,8 +96,6 @@ private static int WriteScopeMetric(byte[] buffer, int writePosition, string met

if (meterTags != null)
{
// TODO: Need to add unit tests for Instrumentation Scope Attributes (MeterTags)

if (meterTags is IReadOnlyList<KeyValuePair<string, object?>> readonlyMeterTags)
{
for (int i = 0; i < readonlyMeterTags.Count; i++)
Expand Down Expand Up @@ -266,13 +266,7 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
}
}

if (metricPoint.TryGetExemplars(out var exemplars))
{
foreach (ref readonly var exemplar in exemplars)
{
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.DoubleValue, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Exemplars);
}
}
writePosition = WriteDoubleExemplars(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Exemplars, in metricPoint);

ProtobufSerializer.WriteReservedLength(buffer, dataPointLengthPosition, writePosition - (dataPointLengthPosition + ReserveSizeForLength));
}
Expand Down Expand Up @@ -336,13 +330,7 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)

ProtobufSerializer.WriteReservedLength(buffer, positiveBucketsLengthPosition, writePosition - (positiveBucketsLengthPosition + ReserveSizeForLength));

if (metricPoint.TryGetExemplars(out var exemplars))
{
foreach (ref readonly var exemplar in exemplars)
{
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.DoubleValue, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Exemplars);
}
}
writePosition = WriteDoubleExemplars(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Exemplars, in metricPoint);

ProtobufSerializer.WriteReservedLength(buffer, dataPointLengthPosition, writePosition - (dataPointLengthPosition + ReserveSizeForLength));
}
Expand Down Expand Up @@ -381,7 +369,12 @@ private static int WriteNumberDataPoint(byte[] buffer, int writePosition, int fi
{
foreach (ref readonly var exemplar in exemplars)
{
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.LongValue, ProtobufOtlpMetricFieldNumberConstants.NumberDataPoint_Exemplars);
writePosition = WriteExemplar(
buffer,
writePosition,
in exemplar,
ProtobufOtlpMetricFieldNumberConstants.NumberDataPoint_Exemplars,
static (byte[] buffer, int writePosition, in Exemplar exemplar) => ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Value_As_Int, (ulong)exemplar.LongValue));
}
}

Expand Down Expand Up @@ -409,13 +402,7 @@ private static int WriteNumberDataPoint(byte[] buffer, int writePosition, int fi
writePosition = WriteTag(buffer, writePosition, tag, ProtobufOtlpMetricFieldNumberConstants.NumberDataPoint_Attributes);
}

if (metricPoint.TryGetExemplars(out var exemplars))
{
foreach (ref readonly var exemplar in exemplars)
{
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.DoubleValue, ProtobufOtlpMetricFieldNumberConstants.NumberDataPoint_Exemplars);
}
}
writePosition = WriteDoubleExemplars(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.NumberDataPoint_Exemplars, in metricPoint);

ProtobufSerializer.WriteReservedLength(buffer, dataPointLengthPosition, writePosition - (dataPointLengthPosition + ReserveSizeForLength));
return writePosition;
Expand All @@ -439,41 +426,52 @@ private static int WriteTag(byte[] buffer, int writePosition, KeyValuePair<strin
return otlpTagWriterState.WritePosition;
}

private static int WriteExemplar(byte[] buffer, int writePosition, in Exemplar exemplar, long value, int fieldNumber)
private static int WriteDoubleExemplars(byte[] buffer, int writePosition, int fieldNumber, in MetricPoint metricPoint)
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, fieldNumber, ProtobufWireType.LEN);
int exemplarLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

// TODO: Need to serialize exemplar.FilteredTags and add unit tests.

// Casting to ulong is ok here as the bit representation for long versus ulong will be the same
// The difference would in the way the bit representation is interpreted on decoding side (signed versus unsigned)
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Value_As_Int, (ulong)value);

var time = (ulong)exemplar.Timestamp.ToUnixTimeNanoseconds();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Time_Unix_Nano, time);

// TODO: Need to serialize exemplar.SpanID and exemplar.TraceId and add unit tests.
if (metricPoint.TryGetExemplars(out var exemplars))
{
foreach (ref readonly var exemplar in exemplars)
{
writePosition = WriteExemplar(
buffer,
writePosition,
in exemplar,
fieldNumber,
static (byte[] buffer, int writePosition, in Exemplar exemplar) => ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Value_As_Double, exemplar.DoubleValue));
}
}

ProtobufSerializer.WriteReservedLength(buffer, exemplarLengthPosition, writePosition - (exemplarLengthPosition + ReserveSizeForLength));
return writePosition;
}

private static int WriteExemplar(byte[] buffer, int writePosition, in Exemplar exemplar, double value, int fieldNumber)
private static int WriteExemplar(byte[] buffer, int writePosition, in Exemplar exemplar, int fieldNumber, WriteExemplarFunc writeValueFunc)
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, fieldNumber, ProtobufWireType.LEN);
int exemplarLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

// TODO: Need to serialize exemplar.FilteredTags and add unit tests.
foreach (var tag in exemplar.FilteredTags)
{
writePosition = WriteTag(buffer, writePosition, tag, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Filtered_Attributes);
}

writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Value_As_Double, value);
writePosition = writeValueFunc(buffer, writePosition, in exemplar);

var time = (ulong)exemplar.Timestamp.ToUnixTimeNanoseconds();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Time_Unix_Nano, time);

// TODO: Need to serialize exemplar.SpanID and exemplar.TraceId and add unit tests.
if (exemplar.SpanId != default)
{
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, SpanIdSize, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Span_Id, ProtobufWireType.LEN);
var spanIdBytes = new Span<byte>(buffer, writePosition, SpanIdSize);
exemplar.SpanId.CopyTo(spanIdBytes);
writePosition += SpanIdSize;

writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, TraceIdSize, ProtobufOtlpMetricFieldNumberConstants.Exemplar_Trace_Id, ProtobufWireType.LEN);
var traceIdBytes = new Span<byte>(buffer, writePosition, TraceIdSize);
exemplar.TraceId.CopyTo(traceIdBytes);
writePosition += TraceIdSize;
}

ProtobufSerializer.WriteReservedLength(buffer, exemplarLengthPosition, writePosition - (exemplarLengthPosition + ReserveSizeForLength));
return writePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ public void ToOtlpResourceMetricsTest(bool useCustomSerializer, bool includeServ

var metrics = new List<Metric>();

using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{includeServiceNameInResource}", "0.0.1");
var meterTags = new KeyValuePair<string, object?>[]
{
new("key1", "value1"),
new("key2", "value2"),
};

using var meter = new Meter(name: $"{Utils.GetCurrentMethodName()}.{includeServiceNameInResource}", version: "0.0.1", tags: meterTags);
using var provider = Sdk.CreateMeterProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddMeter(meter.Name)
Expand Down Expand Up @@ -223,6 +229,10 @@ public void ToOtlpResourceMetricsTest(bool useCustomSerializer, bool includeServ
Assert.Equal(string.Empty, instrumentationLibraryMetrics.SchemaUrl);
Assert.Equal(meter.Name, instrumentationLibraryMetrics.Scope.Name);
Assert.Equal("0.0.1", instrumentationLibraryMetrics.Scope.Version);

Assert.Equal(2, instrumentationLibraryMetrics.Scope.Attributes.Count);
Assert.Contains(instrumentationLibraryMetrics.Scope.Attributes, (kvp) => kvp.Key == "key1" && kvp.Value.StringValue == "value1");
Assert.Contains(instrumentationLibraryMetrics.Scope.Attributes, (kvp) => kvp.Key == "key2" && kvp.Value.StringValue == "value2");
}

[Theory]
Expand Down Expand Up @@ -885,11 +895,16 @@ public void TestTemporalityPreferenceUsingEnvVar(string configValue, MetricReade
}

[Theory]
[InlineData(false, false)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(true, true)]
public void ToOtlpExemplarTests(bool enableTagFiltering, bool enableTracing)
[InlineData(true, false, false)]
[InlineData(true, true, false)]
[InlineData(true, false, true)]
[InlineData(true, true, true)]

[InlineData(false, false, false)]
[InlineData(false, true, false)]
[InlineData(false, false, true)]
[InlineData(false, true, true)]
public void ToOtlpExemplarTests(bool useCustomSerializer, bool enableTagFiltering, bool enableTracing)
{
ActivitySource? activitySource = null;
Activity? activity = null;
Expand Down Expand Up @@ -933,37 +948,42 @@ public void ToOtlpExemplarTests(bool enableTagFiltering, bool enableTracing)

meterProvider.ForceFlush();

var counterDoubleMetric = exportedItems.FirstOrDefault(m => m.Name == counterDouble.Name);
var counterLongMetric = exportedItems.FirstOrDefault(m => m.Name == counterLong.Name);
var batch = new Batch<Metric>(exportedItems.ToArray(), exportedItems.Count);
var request = new OtlpCollector.ExportMetricsServiceRequest();
if (useCustomSerializer)
{
request = CreateMetricExportRequest(batch, ResourceBuilder.CreateEmpty().Build());
}
else
{
request.AddMetrics(ResourceBuilder.CreateEmpty().Build().ToOtlpResource(), batch);
}

Assert.NotNull(counterDoubleMetric);
Assert.NotNull(counterLongMetric);
Assert.Single(request.ResourceMetrics);
var resourceMetric = request.ResourceMetrics.First();
var otlpResource = resourceMetric.Resource;

AssertExemplars(1.18D, counterDoubleMetric);
AssertExemplars(18L, counterLongMetric);
Assert.Single(resourceMetric.ScopeMetrics);
var instrumentationLibraryMetrics = resourceMetric.ScopeMetrics.First();
Assert.Equal(meter.Name, instrumentationLibraryMetrics.Scope.Name);

var scopeMetrics = resourceMetric.ScopeMetrics.Single();
var otlpCounterDoubleMetric = scopeMetrics.Metrics.Single(m => m.Name == counterDouble.Name);
var otlpCounterLongMetric = scopeMetrics.Metrics.Single(m => m.Name == counterLong.Name);

AssertExemplars(1.18D, otlpCounterDoubleMetric);
AssertExemplars(18L, otlpCounterLongMetric);

activity?.Dispose();
tracerProvider?.Dispose();
activitySource?.Dispose();

void AssertExemplars<T>(T value, Metric metric)
void AssertExemplars<T>(T value, OtlpMetrics.Metric metric)
where T : struct
{
var metricPointEnumerator = metric.GetMetricPoints().GetEnumerator();
Assert.True(metricPointEnumerator.MoveNext());

ref readonly var metricPoint = ref metricPointEnumerator.Current;

var result = metricPoint.TryGetExemplars(out var exemplars);
Assert.True(result);

var exemplarEnumerator = exemplars.GetEnumerator();
Assert.True(exemplarEnumerator.MoveNext());

ref readonly var exemplar = ref exemplarEnumerator.Current;

var otlpExemplar = MetricItemExtensions.ToOtlpExemplar<T>(value, in exemplar);
Assert.NotNull(otlpExemplar);
Assert.Single(metric.Sum.DataPoints);
var dataPoint = metric.Sum.DataPoints.First();
var otlpExemplar = dataPoint.Exemplars.First();

Assert.NotEqual(default, otlpExemplar.TimeUnixNano);
if (!enableTracing)
Expand All @@ -986,30 +1006,30 @@ void AssertExemplars<T>(T value, Metric metric)

if (typeof(T) == typeof(long))
{
Assert.Equal((long)(object)value, exemplar.LongValue);
Assert.Equal((long)(object)value, otlpExemplar.AsInt);
}
else if (typeof(T) == typeof(double))
{
Assert.Equal((double)(object)value, exemplar.DoubleValue);
Assert.Equal((double)(object)value, otlpExemplar.AsDouble);
}
else
{
Debug.Fail("Unexpected type");
Assert.Fail("Unexpected type");
}

if (!enableTagFiltering)
{
var tagEnumerator = exemplar.FilteredTags.GetEnumerator();
var tagEnumerator = otlpExemplar.FilteredAttributes.GetEnumerator();
Assert.False(tagEnumerator.MoveNext());
}
else
{
var tagEnumerator = exemplar.FilteredTags.GetEnumerator();
var tagEnumerator = otlpExemplar.FilteredAttributes.GetEnumerator();
Assert.True(tagEnumerator.MoveNext());

var tag = tagEnumerator.Current;
Assert.Equal("key1", tag.Key);
Assert.Equal("value1", tag.Value);
Assert.Equal("value1", tag.Value.StringValue);
}
}
}
Expand Down

0 comments on commit b201d70

Please sign in to comment.