Skip to content

Commit

Permalink
[otlp] OTLP Exporter Custom serializer - Log Scopes (open-telemetry#5945
Browse files Browse the repository at this point in the history
)
  • Loading branch information
CodeBlanch authored Oct 31, 2024
1 parent 1fc0ebc commit dc7f1eb
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ internal static class ProtobufOtlpLogSerializer
private static readonly Stack<List<LogRecord>> LogsListPool = [];
private static readonly Dictionary<string, List<LogRecord>> ScopeLogsList = [];

[ThreadStatic]
private static SerializationState? threadSerializationState;

internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, in Batch<LogRecord> logRecordBatch)
{
foreach (var logRecord in logRecordBatch)
Expand Down Expand Up @@ -95,17 +98,20 @@ internal static int WriteScopeLog(byte[] buffer, int writePosition, SdkLimitOpti

internal static int WriteLogRecord(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, LogRecord logRecord)
{
var attributeValueLengthLimit = sdkLimitOptions.LogRecordAttributeValueLengthLimit;
var attributeCountLimit = sdkLimitOptions.LogRecordAttributeCountLimit ?? int.MaxValue;
var state = threadSerializationState ??= new();

ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState = new ProtobufOtlpTagWriter.OtlpTagWriterState
state.AttributeValueLengthLimit = sdkLimitOptions.LogRecordAttributeValueLengthLimit;
state.AttributeCountLimit = sdkLimitOptions.LogRecordAttributeCountLimit ?? int.MaxValue;
state.TagWriterState = new ProtobufOtlpTagWriter.OtlpTagWriterState
{
Buffer = buffer,
WritePosition = writePosition,
TagCount = 0,
DroppedTagCount = 0,
};

ref var otlpTagWriterState = ref state.TagWriterState;

otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.ScopeLogs_Log_Records, ProtobufWireType.LEN);
int logRecordLengthPosition = otlpTagWriterState.WritePosition;
otlpTagWriterState.WritePosition += ReserveSizeForLength;
Expand All @@ -129,20 +135,20 @@ internal static int WriteLogRecord(byte[] buffer, int writePosition, SdkLimitOpt
{
if (logRecord.EventId.Id != default)
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, ExperimentalOptions.LogRecordEventIdAttribute, logRecord.EventId.Id, attributeCountLimit, attributeValueLengthLimit);
AddLogAttribute(state, ExperimentalOptions.LogRecordEventIdAttribute, logRecord.EventId.Id);
}

if (!string.IsNullOrEmpty(logRecord.EventId.Name))
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, ExperimentalOptions.LogRecordEventNameAttribute, logRecord.EventId.Name!, attributeCountLimit, attributeValueLengthLimit);
AddLogAttribute(state, ExperimentalOptions.LogRecordEventNameAttribute, logRecord.EventId.Name!);
}
}

if (logRecord.Exception != null)
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, SemanticConventions.AttributeExceptionType, logRecord.Exception.GetType().Name, attributeCountLimit, attributeValueLengthLimit);
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, SemanticConventions.AttributeExceptionMessage, logRecord.Exception.Message, attributeCountLimit, attributeValueLengthLimit);
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, SemanticConventions.AttributeExceptionStacktrace, logRecord.Exception.ToInvariantString(), attributeCountLimit, attributeValueLengthLimit);
AddLogAttribute(state, SemanticConventions.AttributeExceptionType, logRecord.Exception.GetType().Name);
AddLogAttribute(state, SemanticConventions.AttributeExceptionMessage, logRecord.Exception.Message);
AddLogAttribute(state, SemanticConventions.AttributeExceptionStacktrace, logRecord.Exception.ToInvariantString());
}

bool bodyPopulatedFromFormattedMessage = false;
Expand All @@ -169,7 +175,7 @@ internal static int WriteLogRecord(byte[] buffer, int writePosition, SdkLimitOpt
}
else
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, attribute, attributeCountLimit, attributeValueLengthLimit);
AddLogAttribute(state, attribute);
}
}

Expand All @@ -193,11 +199,19 @@ internal static int WriteLogRecord(byte[] buffer, int writePosition, SdkLimitOpt
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteFixed32WithTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Flags, (uint)logRecord.TraceFlags);
}

/*
* TODO: Handle scopes, otlpTagWriterState needs to be passed as ref.
logRecord.ForEachScope(ProcessScope, otlpTagWriterState);
logRecord.ForEachScope(ProcessScope, state);

if (otlpTagWriterState.DroppedTagCount > 0)
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Dropped_Attributes_Count, ProtobufWireType.VARINT);
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteVarInt32(buffer, otlpTagWriterState.WritePosition, (uint)otlpTagWriterState.DroppedTagCount);
}

ProtobufSerializer.WriteReservedLength(otlpTagWriterState.Buffer, logRecordLengthPosition, otlpTagWriterState.WritePosition - (logRecordLengthPosition + ReserveSizeForLength));

return otlpTagWriterState.WritePosition;

void ProcessScope(LogRecordScope scope, ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState)
static void ProcessScope(LogRecordScope scope, SerializationState state)
{
foreach (var scopeItem in scope)
{
Expand Down Expand Up @@ -225,21 +239,10 @@ void ProcessScope(LogRecordScope scope, ProtobufOtlpTagWriter.OtlpTagWriterState
}
else
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, scopeItem, attributeCountLimit, attributeValueLengthLimit);
AddLogAttribute(state, scopeItem);
}
}
}
*/

if (otlpTagWriterState.DroppedTagCount > 0)
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Dropped_Attributes_Count, ProtobufWireType.VARINT);
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteVarInt32(buffer, otlpTagWriterState.WritePosition, (uint)otlpTagWriterState.DroppedTagCount);
}

ProtobufSerializer.WriteReservedLength(otlpTagWriterState.Buffer, logRecordLengthPosition, otlpTagWriterState.WritePosition - (logRecordLengthPosition + ReserveSizeForLength));

return otlpTagWriterState.WritePosition;
}

private static int WriteLogRecordBody(byte[] buffer, int writePosition, ReadOnlySpan<char> value)
Expand All @@ -253,30 +256,35 @@ private static int WriteLogRecordBody(byte[] buffer, int writePosition, ReadOnly
return writePosition;
}

private static ProtobufOtlpTagWriter.OtlpTagWriterState AddLogAttribute(ref ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState, KeyValuePair<string, object?> attribute, int maxAttributeCount, int? maxValueLength)
private static void AddLogAttribute(SerializationState state, KeyValuePair<string, object?> attribute)
{
return AddLogAttribute(ref otlpTagWriterState, attribute.Key, attribute.Value, maxAttributeCount, maxValueLength);
AddLogAttribute(state, attribute.Key, attribute.Value);
}

private static ProtobufOtlpTagWriter.OtlpTagWriterState AddLogAttribute(ref ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState, string key, object? value, int maxAttributeCount, int? maxValueLength)
private static void AddLogAttribute(SerializationState state, string key, object? value)
{
if (otlpTagWriterState.TagCount == maxAttributeCount)
if (state.TagWriterState.TagCount == state.AttributeCountLimit)
{
otlpTagWriterState.DroppedTagCount++;
state.TagWriterState.DroppedTagCount++;
}
else
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Attributes, ProtobufWireType.LEN);
int logAttributesLengthPosition = otlpTagWriterState.WritePosition;
otlpTagWriterState.WritePosition += ReserveSizeForLength;
state.TagWriterState.WritePosition = ProtobufSerializer.WriteTag(state.TagWriterState.Buffer, state.TagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Attributes, ProtobufWireType.LEN);
int logAttributesLengthPosition = state.TagWriterState.WritePosition;
state.TagWriterState.WritePosition += ReserveSizeForLength;

ProtobufOtlpTagWriter.Instance.TryWriteTag(ref otlpTagWriterState, key, value, maxValueLength);
ProtobufOtlpTagWriter.Instance.TryWriteTag(ref state.TagWriterState, key, value, state.AttributeValueLengthLimit);

var logAttributesLength = otlpTagWriterState.WritePosition - (logAttributesLengthPosition + ReserveSizeForLength);
ProtobufSerializer.WriteReservedLength(otlpTagWriterState.Buffer, logAttributesLengthPosition, logAttributesLength);
otlpTagWriterState.TagCount++;
var logAttributesLength = state.TagWriterState.WritePosition - (logAttributesLengthPosition + ReserveSizeForLength);
ProtobufSerializer.WriteReservedLength(state.TagWriterState.Buffer, logAttributesLengthPosition, logAttributesLength);
state.TagWriterState.TagCount++;
}
}

return otlpTagWriterState;
private sealed class SerializationState
{
public int? AttributeValueLengthLimit;
public int AttributeCountLimit;
public ProtobufOtlpTagWriter.OtlpTagWriterState TagWriterState;
}
}
Loading

0 comments on commit dc7f1eb

Please sign in to comment.