Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce OpenTelemetry #2358

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

MahmoudSamir101
Copy link

@MahmoudSamir101 MahmoudSamir101 commented Sep 22, 2022

Hello,

This PR was inspired by a previous discussion on gitier about introducing OpenTelemetry in Marten project.

The PR has primarily divided the functionalities between two libraries: the core library (Marten), which is responsible for emitting diagnostics events during runtime to describe Marten internal infrastructure.

The second library (Marten.OpenTelemetry) is in charge of developing the OpenTelemetry instrumentation by subscribing to diagnostic events and creating activities (traces & spans) with tags and events.

The current scope of the PR covers Streams, to agree on the design for new proposed APIs and after that, it will cover the remaining components of Marten such as aggregates, projections, daemon, and so on.

Proposed APIS:

  • Marten Library Diagnostics Apis :
  1. DiagnosticSource : writes diagnostic events to subscribers, it is a generic class to categorize the same functionalities with the same type, such as DiagnositcSource for Stream, another one for Aggregate or Project, the generic type will be used as the clue for extensions methods that will be created for specific DiagnosticSources, the extension methods will be used through various places in Marten library to provide the proper diagnostic data for each DiagnositcSource
public interface IDiagnosticSource<TCategory>
    where TCategory : DiagnosticCategory<TCategory>, new()
{
    void Write(DiagnosticEventBase eventData);
}

public class DiagnosticSource<TCategory>: IDiagnosticSource<TCategory>
    where TCategory : DiagnosticCategory<TCategory>, new()
{
    public void Write(DiagnosticEventBase eventData)
    {
        if (Listener.Instance.IsEnabled(eventData.EventId.Name))
        {
            Listener.Instance.Write(eventData.EventId.Name, eventData);
        }
    }
}

2.DiagnosticCategory : defines diagnostics categories that group some diagnostic events, such as Stream, Aggregate, and Project through declaring extension methods for IDiagnosticSource<DiagnosticCategory.[Type]>

public static class DiagnosticCategory
{
    public const string Name = "Marten";

    public class Stream: DiagnosticCategory<Stream> { }
    public class Projection: DiagnosticCategory<Projection> { }
}

public static class EventStoreDiagnosticExtensions
{
    public static void StartStream(this IDiagnosticSource<DiagnosticCategory.Stream> diagnostic, StreamAction streamAction, 
string? correlationId)
    {
        StreamCreatedDiagnosticEvent @event = new(streamAction)
        {
            CorrelationId = correlationId
        };
        diagnostic.Write(@event);
    }
}

  1. DiagnosticEventId : declares diagnostic events by creating EventId with predefined Id and Name for that event, the EventId will be used to name the corresponding Activity/Trace
public static class DiagnosticEventId
{
    public static readonly EventId StreamCreated = MakeStreamId(Id.StreamCreated);
    public static readonly EventId StreamAppended = MakeStreamId(Id.StreamAppended);
    public static readonly EventId StreamChangesCompleted = MakeStreamId(Id.StreamChangesCompleted);
    public static readonly EventId StreamChangesFailed = MakeStreamId(Id.StreamChangesFailed);
}

  1. DiagnosticEventBase : base class for declaring diagnostic event with EventId for Activity/Trace name, DisplayName to give more description for that event (used by OpenTelemetry exportes) and CorrelationId for grouping these events in one parent Activity/Trace , the CorrelationId value will be set through Session.
public class DiagnosticEventBase
{
    public DiagnosticEventBase(EventId eventId)
    {
        EventId = eventId;
        DisplayName = EventId.Name;
    }

    public EventId EventId { get; }
    public string? CorrelationId { get; set; }
    public virtual string? DisplayName { get; }
}

//DiagnosticEvent when Stream Created 
public class StreamCreatedDiagnosticEvent: StreamBaseDiagnosticEvent
{
    public StreamCreatedDiagnosticEvent(StreamAction streamAction)
        : base(streamAction, DiagnosticEventId.StreamCreated)
    {
        DisplayName = $"Create Stream [{StreamAction.AggregateType.Name}]";
    }

    public override string DisplayName { get; }
}

So based on above design the DiagnosticSource of specific DiagnosticCategory (Stream) will write diagnostic events (StreamCreated, StreamAppended,..) to DiagnosticListeners that will be created by the Marten.OpenTelemetry library or even any other listeners for any use case, the diagnostic events will be created using extension methods that will extend this DiagnosticSource, these extension methods will be called inside the Marten infrastrcure code base such as , the EventStore.Stream class will used to invoke Stream DiagnosticSource extension methods

 internal partial class EventStore
{
   //omit for brevity
  public StreamAction StartStream(Type aggregateType, Guid id, params object[] events)
  {
            var stream = _store.Events.StartStream(_session, id, events);
            stream.AggregateType = aggregateType;
/* Stream DiagnosticSource invoke StartStream extension method  to create StreamCreated diagnostic event and 
write that for subscribed listeners */
            StreamDiagnosticSource.Instance.StartStream(stream, _session.CorrelationId); 
            return stream;
   }
}
  • Marten.OpenTelemetry :
    The library provides the blocks that build Marten OpenTelemetry instrumentation, that will be hooked to the OpenTelemetry SDK AddOpenTelemetryTracing method to instrument Marten diagnostic events in the shape of Activities/Traces to exporters
  1. TraceProviderBuilderExtensions : creates and adds instrumentation instance to TraceProviderBuilder
 public static TracerProviderBuilder AddMartenInstrumentation(this TracerProviderBuilder builder, Action<MartenInstrumentationOptions>? configure = null)
    {
        MartenInstrumentationOptions options = new();
        configure?.Invoke(options);
        builder.AddSource(MartenListener.SourceName);
        builder.AddInstrumentation(() => new OpenTelemetryInstrumentation(options));
        return builder;
    }

  1. OpenTelemetryInstrumentation : creates library listener that subscribes to Marten diagnostic events
internal class OpenTelemetryInstrumentation: IDisposable
{
    private readonly DiagnosticSourceSubscriber subscriber;
    public OpenTelemetryInstrumentation(MartenInstrumentationOptions options)
    {
        options ??= new();
        MartenListener listener = new();
        subscriber = new DiagnosticSourceSubscriber(
            name => listener,
            listener => listener.Name == DiagnosticCategory.Name,
            null);

        subscriber.Subscribe();
    }
    public void Dispose()
    {
        subscriber.Dispose();
    }
}

  1. MartenListener: Core component for building the Activity/Trace chain, subscribes to diagnostic events and read diagnostic data, converts them to proper OpenTelemetry tags, attaches events (ES Events) to Activity/Trace as an Activity Event, and creates correlation activity for events that share the same CorrelationId

@MahmoudSamir101
Copy link
Author

I created a sample project to show the result for instrumentation, below are some screenshots from Jaeger as an OpenTelemetry exporter

image

1

2

3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant