Skip to content

Commit

Permalink
[otlp] Buffer Size Handling with Retry Logic and OTLP Signal Path (#5988
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rajkumar-rangaraj authored Nov 22, 2024
1 parent e3665c9 commit f9a0b4c
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient ht
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.AppendSignalPathToEndpoint
? options.Endpoint.AppendPathIfNotPresent(signalPath)
: options.Endpoint;
Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ public void RetryStoredRequestException(string ex)
this.WriteEvent(13, ex);
}

[Event(14, Message = "{0} buffer exceeded the maximum allowed size. Current size: {1} bytes.", Level = EventLevel.Error)]
public void BufferExceededMaxSize(string signalType, int length)
{
this.WriteEvent(14, signalType, length);
}

[Event(15, Message = "{0} buffer resizing failed due to insufficient memory.", Level = EventLevel.Error)]
public void BufferResizeFailedDueToMemory(string signalType)
{
this.WriteEvent(15, signalType);
}

void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value)
{
this.InvalidConfigurationValue(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,41 @@ internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOpt
activities.Add(activity);
}

writePosition = WriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource, ScopeTracesList);
writePosition = TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
ReturnActivityListToPool();
ProtobufSerializer.WriteReservedLength(buffer, resourceSpansScopeSpansLengthPosition, writePosition - (resourceSpansScopeSpansLengthPosition + ReserveSizeForLength));

return writePosition;
}

internal static int TryWriteResourceSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource)
{
try
{
writePosition = WriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
}
catch (IndexOutOfRangeException)
{
// Attempt to increase the buffer size
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Traces))
{
throw;
}

// Retry serialization after increasing the buffer size.
// The recursion depth is limited to a maximum of 7 calls, as the buffer size starts at ~732 KB
// and doubles until it reaches the maximum size of 100 MB. This ensures the recursion remains safe
// and avoids stack overflow.
return TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
}
catch
{
throw;
}

return writePosition;
}

internal static void ReturnActivityListToPool()
{
if (ScopeTracesList.Count != 0)
Expand All @@ -57,19 +85,19 @@ internal static void ReturnActivityListToPool()
}
}

internal static int WriteResourceSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource, Dictionary<string, List<Activity>> scopeTraces)
internal static int WriteResourceSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource)
{
writePosition = ProtobufOtlpResourceSerializer.WriteResource(buffer, writePosition, resource);
writePosition = WriteScopeSpans(buffer, writePosition, sdkLimitOptions, scopeTraces);
writePosition = WriteScopeSpans(buffer, writePosition, sdkLimitOptions);

return writePosition;
}

internal static int WriteScopeSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Dictionary<string, List<Activity>> scopeTraces)
internal static int WriteScopeSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions)
{
if (scopeTraces != null)
if (ScopeTracesList != null)
{
foreach (KeyValuePair<string, List<Activity>> entry in scopeTraces)
foreach (KeyValuePair<string, List<Activity>> entry in ScopeTracesList)
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpTraceFieldNumberConstants.ResourceSpans_Scope_Spans, ProtobufWireType.LEN);
int resourceSpansScopeSpansLengthPosition = writePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer

internal static class ProtobufSerializer
{
private const int MaxBufferSize = 100 * 1024 * 1024;
private const uint UInt128 = 0x80;
private const ulong ULong128 = 0x80;
private const int Fixed32Size = 4;
Expand Down Expand Up @@ -340,6 +341,29 @@ internal static int WriteStringWithTag(byte[] buffer, int writePosition, int fie
return writePosition;
}

internal static bool IncreaseBufferSize(ref byte[] buffer, OtlpSignalType otlpSignalType)
{
if (buffer.Length >= MaxBufferSize)
{
OpenTelemetryProtocolExporterEventSource.Log.BufferExceededMaxSize(otlpSignalType.ToString(), buffer.Length);
return false;
}

try
{
var newBufferSize = buffer.Length * 2;
var newBuffer = new byte[newBufferSize];
buffer.CopyTo(newBuffer, 0);
buffer = newBuffer;
return true;
}
catch (OutOfMemoryException)
{
OpenTelemetryProtocolExporterEventSource.Log.BufferResizeFailedDueToMemory(otlpSignalType.ToString());
return false;
}
}

#if NETFRAMEWORK || NETSTANDARD2_0
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static unsafe ref T GetNonNullPinnableReference<T>(ReadOnlySpan<T> span)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,8 @@ internal bool InitiateAndWaitForRetryProcess(int timeOutMilliseconds)

protected override bool OnSubmitRequestFailure(byte[] request, int contentLength, ExportClientResponse response)
{
if (RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out _))
{
byte[]? data = null;

if (data != null)
{
return this.persistentBlobProvider.TryCreateBlob(data, out _);
}
}

return false;
Debug.Assert(request != null, "request was null");
return RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out _) && this.persistentBlobProvider.TryCreateBlob(request!, out _);
}

protected override void OnShutdown(int timeoutMilliseconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ namespace OpenTelemetry.Exporter;

internal static class OtlpExporterOptionsExtensions
{
private const string TraceGrpcServicePath = "opentelemetry.proto.collector.trace.v1.TraceService/Export";
private const string MetricsGrpcServicePath = "opentelemetry.proto.collector.metrics.v1.MetricsService/Export";
private const string LogsGrpcServicePath = "opentelemetry.proto.collector.logs.v1.LogsService/Export";

private const string TraceHttpServicePath = "v1/traces";
private const string MetricsHttpServicePath = "v1/metrics";
private const string LogsHttpServicePath = "v1/logs";

#if NETSTANDARD2_1 || NET
public static GrpcChannel CreateChannel(this OtlpExporterOptions options)
#else
Expand Down Expand Up @@ -127,9 +135,9 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
}
}

public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions)
public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
{
var exportClient = GetProtobufExportClient(options);
var exportClient = GetProtobufExportClient(options, otlpSignalType);

// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
Expand Down Expand Up @@ -157,18 +165,26 @@ public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmiss
}
}

public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOptions options)
public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType)
{
var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.");

if (options.Protocol == OtlpExportProtocol.Grpc)
{
return new ProtobufOtlpGrpcExportClient(options, httpClient, "opentelemetry.proto.collector.trace.v1.TraceService/Export");
}
else
return otlpSignalType switch
{
return new ProtobufOtlpHttpExportClient(options, httpClient, "v1/traces");
}
OtlpSignalType.Traces => options.Protocol == OtlpExportProtocol.Grpc
? new ProtobufOtlpGrpcExportClient(options, httpClient, TraceGrpcServicePath)
: new ProtobufOtlpHttpExportClient(options, httpClient, TraceHttpServicePath),

OtlpSignalType.Metrics => options.Protocol == OtlpExportProtocol.Grpc
? new ProtobufOtlpGrpcExportClient(options, httpClient, MetricsGrpcServicePath)
: new ProtobufOtlpHttpExportClient(options, httpClient, MetricsHttpServicePath),

OtlpSignalType.Logs => options.Protocol == OtlpExportProtocol.Grpc
? new ProtobufOtlpGrpcExportClient(options, httpClient, LogsGrpcServicePath)
: new ProtobufOtlpHttpExportClient(options, httpClient, LogsHttpServicePath),

_ => throw new NotSupportedException($"OtlpSignalType {otlpSignalType} is not supported."),
};
}

public static OtlpExporterTransmissionHandler<MetricsOtlpCollector.ExportMetricsServiceRequest> GetMetricsExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions)
Expand Down
25 changes: 25 additions & 0 deletions src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpSignalType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.Exporter;

/// <summary>
/// Represents the different types of signals that can be exported.
/// </summary>
internal enum OtlpSignalType
{
/// <summary>
/// Represents trace signals.
/// </summary>
Traces = 0,

/// <summary>
/// Represents metric signals.
/// </summary>
Metrics = 1,

/// <summary>
/// Represents log signals.
/// </summary>
Logs = 2,
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ internal ProtobufOtlpLogExporter(
this.experimentalOptions = experimentalOptions!;
this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!);
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ internal ProtobufOtlpMetricExporter(
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");

this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!);
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ internal ProtobufOtlpTraceExporter(

this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions);
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand Down Expand Up @@ -85,13 +85,6 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
Expand All @@ -106,20 +99,4 @@ protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}

private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}

0 comments on commit f9a0b4c

Please sign in to comment.