Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumentation: Improve automatic instrumentation by the SDK to include handler/request logs, metrics and traces #1028

Merged
merged 9 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions backend/adapter_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package backend

import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type handlerWrapperFunc func(ctx context.Context) (RequestStatus, error)

func setupContext(ctx context.Context, endpoint Endpoint) context.Context {
ctx = WithEndpoint(ctx, endpoint)
ctx = propagateTenantIDIfPresent(ctx)

return ctx
}

func wrapHandler(ctx context.Context, pluginCtx PluginContext, next handlerWrapperFunc) error {
ctx = setupHandlerContext(ctx, pluginCtx)
wrapper := errorWrapper(next)
wrapper = logWrapper(wrapper)
wrapper = metricWrapper(wrapper)
wrapper = tracingWrapper(wrapper)
_, err := wrapper(ctx)
return err
}

func setupHandlerContext(ctx context.Context, pluginCtx PluginContext) context.Context {
ctx = initErrorSource(ctx)
ctx = WithGrafanaConfig(ctx, pluginCtx.GrafanaConfig)
ctx = WithPluginContext(ctx, pluginCtx)
ctx = WithUser(ctx, pluginCtx.User)
ctx = withContextualLogAttributes(ctx, pluginCtx)
ctx = WithUserAgent(ctx, pluginCtx.UserAgent)
return ctx
}

func errorWrapper(next handlerWrapperFunc) handlerWrapperFunc {
return func(ctx context.Context) (RequestStatus, error) {
status, err := next(ctx)

if err != nil && IsDownstreamError(err) {
if innerErr := WithDownstreamErrorSource(ctx); innerErr != nil {
return RequestStatusError, fmt.Errorf("failed to set downstream status source: %w", errors.Join(innerErr, err))
}
}

return status, err
}
}

var pluginRequestCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "grafana_plugin",
Name: "request_total",
Help: "The total amount of plugin requests",
}, []string{"endpoint", "status", "status_source"})

var once = sync.Once{}

func metricWrapper(next handlerWrapperFunc) handlerWrapperFunc {
once.Do(func() {
prometheus.MustRegister(pluginRequestCounter)
})

return func(ctx context.Context) (RequestStatus, error) {
endpoint := EndpointFromContext(ctx)
status, err := next(ctx)

pluginRequestCounter.WithLabelValues(endpoint.String(), status.String(), string(errorSourceFromContext(ctx))).Inc()

return status, err
}
}

func tracingWrapper(next handlerWrapperFunc) handlerWrapperFunc {
return func(ctx context.Context) (RequestStatus, error) {
endpoint := EndpointFromContext(ctx)
pluginCtx := PluginConfigFromContext(ctx)
ctx, span := tracing.DefaultTracer().Start(ctx, fmt.Sprintf("sdk.%s", endpoint), trace.WithAttributes(
attribute.String("plugin_id", pluginCtx.PluginID),
attribute.Int64("org_id", pluginCtx.OrgID),
))
defer span.End()

if pluginCtx.DataSourceInstanceSettings != nil {
span.SetAttributes(
attribute.String("datasource_name", pluginCtx.DataSourceInstanceSettings.Name),
attribute.String("datasource_uid", pluginCtx.DataSourceInstanceSettings.UID),
)
}

if u := pluginCtx.User; u != nil {
span.SetAttributes(attribute.String("user", pluginCtx.User.Name))
}

status, err := next(ctx)

span.SetAttributes(
attribute.String("request_status", status.String()),
attribute.String("status_source", string(errorSourceFromContext(ctx))),
)

if err != nil {
return status, tracing.Error(span, err)
}

return status, err
}
}

func logWrapper(next handlerWrapperFunc) handlerWrapperFunc {
return func(ctx context.Context) (RequestStatus, error) {
start := time.Now()
status, err := next(ctx)

logParams := []any{
"status", status.String(),
"duration", time.Since(start),
}

if err != nil {
logParams = append(logParams, "error", err)
}

logParams = append(logParams, "statusSource", string(errorSourceFromContext(ctx)))

ctxLogger := Logger.FromContext(ctx)
logFunc := ctxLogger.Debug
if status > RequestStatusOK {
logFunc = ctxLogger.Error
}

logFunc("Plugin Request Completed", logParams...)

return status, err
}
}

func withHeaderMiddleware(ctx context.Context, headers http.Header) context.Context {
if len(headers) > 0 {
ctx = httpclient.WithContextualMiddleware(ctx,
httpclient.MiddlewareFunc(func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper {
if !opts.ForwardHTTPHeaders {
return next
}

return httpclient.RoundTripperFunc(func(qreq *http.Request) (*http.Response, error) {
// Only set a header if it is not already set.
for k, v := range headers {
if qreq.Header.Get(k) == "" {
for _, vv := range v {
qreq.Header.Add(k, vv)
}
}
}
return next.RoundTrip(qreq)
})
}))
}
return ctx
}
37 changes: 37 additions & 0 deletions backend/adapter_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package backend

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func TestErrorWrapper(t *testing.T) {
t.Run("No downstream error should not set downstream error source in context", func(t *testing.T) {
ctx := initErrorSource(context.Background())

actualErr := errors.New("BOOM")
wrapper := errorWrapper(func(_ context.Context) (RequestStatus, error) {
return RequestStatusError, actualErr
})
status, err := wrapper(ctx)
require.ErrorIs(t, err, actualErr)
require.Equal(t, RequestStatusError, status)
require.Equal(t, DefaultErrorSource, errorSourceFromContext(ctx))
})

t.Run("Downstream error should set downstream error source in context", func(t *testing.T) {
ctx := initErrorSource(context.Background())

actualErr := errors.New("BOOM")
wrapper := errorWrapper(func(_ context.Context) (RequestStatus, error) {
return RequestStatusError, DownstreamError(actualErr)
})
status, err := wrapper(ctx)
require.ErrorIs(t, err, actualErr)
require.Equal(t, RequestStatusError, status)
require.Equal(t, ErrorSourceDownstream, errorSourceFromContext(ctx))
})
}
51 changes: 27 additions & 24 deletions backend/admission_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,52 @@ func newAdmissionSDKAdapter(handler AdmissionHandler) *admissionSDKAdapter {
}

func (a *admissionSDKAdapter) ValidateAdmission(ctx context.Context, req *pluginv2.AdmissionRequest) (*pluginv2.ValidationResponse, error) {
ctx = WithEndpoint(ctx, EndpointValidateAdmission)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointValidateAdmission)
parsedReq := FromProto().AdmissionRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.handler.ValidateAdmission(ctx, parsedReq)

var resp *ValidationResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
var innerErr error
resp, innerErr = a.handler.ValidateAdmission(ctx, parsedReq)
return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}

return ToProto().ValidationResponse(resp), nil
}

func (a *admissionSDKAdapter) MutateAdmission(ctx context.Context, req *pluginv2.AdmissionRequest) (*pluginv2.MutationResponse, error) {
ctx = WithEndpoint(ctx, EndpointMutateAdmission)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointMutateAdmission)
parsedReq := FromProto().AdmissionRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.handler.MutateAdmission(ctx, parsedReq)

var resp *MutationResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
var innerErr error
resp, innerErr = a.handler.MutateAdmission(ctx, parsedReq)
return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}

return ToProto().MutationResponse(resp), nil
}

func (a *admissionSDKAdapter) ConvertObject(ctx context.Context, req *pluginv2.ConversionRequest) (*pluginv2.ConversionResponse, error) {
ctx = WithEndpoint(ctx, EndpointConvertObject)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointConvertObject)
parsedReq := FromProto().ConversionRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.handler.ConvertObject(ctx, parsedReq)

var resp *ConversionResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
var innerErr error
resp, innerErr = a.handler.ConvertObject(ctx, parsedReq)
return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}

return ToProto().ConversionResponse(resp), nil
}
4 changes: 4 additions & 0 deletions backend/convert_to_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func (t ConvertToProtobuf) QueryDataRequest(req *QueryDataRequest) *pluginv2.Que
// It will set the RefID on the frames to the RefID key in Responses if a Frame's
// RefId property is an empty string.
func (t ConvertToProtobuf) QueryDataResponse(res *QueryDataResponse) (*pluginv2.QueryDataResponse, error) {
if res == nil {
return nil, nil
}
marefr marked this conversation as resolved.
Show resolved Hide resolved

pQDR := &pluginv2.QueryDataResponse{
Responses: make(map[string]*pluginv2.DataResponse, len(res.Responses)),
}
Expand Down
74 changes: 39 additions & 35 deletions backend/data_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package backend

import (
"context"
"net/http"
"errors"
"fmt"

"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
)

Expand All @@ -19,41 +19,45 @@ func newDataSDKAdapter(handler QueryDataHandler) *dataSDKAdapter {
}
}

func withHeaderMiddleware(ctx context.Context, headers http.Header) context.Context {
if len(headers) > 0 {
ctx = httpclient.WithContextualMiddleware(ctx,
httpclient.MiddlewareFunc(func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper {
if !opts.ForwardHTTPHeaders {
return next
}

return httpclient.RoundTripperFunc(func(qreq *http.Request) (*http.Response, error) {
// Only set a header if it is not already set.
for k, v := range headers {
if qreq.Header.Get(k) == "" {
for _, vv := range v {
qreq.Header.Add(k, vv)
}
}
}
return next.RoundTrip(qreq)
})
}))
}
return ctx
}

func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest) (*pluginv2.QueryDataResponse, error) {
ctx = WithEndpoint(ctx, EndpointQueryData)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointQueryData)
parsedReq := FromProto().QueryDataRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders())
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.queryDataHandler.QueryData(ctx, parsedReq)

var resp *QueryDataResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders())
var innerErr error
resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq)

if resp == nil || len(resp.Responses) == 0 {
return RequestStatusFromError(innerErr), innerErr
}

// Set downstream status source in the context if there's at least one response with downstream status source,
// and if there's no plugin error
var hasPluginError bool
var hasDownstreamError bool
for _, r := range resp.Responses {
if r.Error == nil {
continue
}
if r.ErrorSource == ErrorSourceDownstream {
hasDownstreamError = true
} else {
hasPluginError = true
}
}

// A plugin error has higher priority than a downstream error,
// so set to downstream only if there's no plugin error
if hasDownstreamError && !hasPluginError {
if err := WithDownstreamErrorSource(ctx); err != nil {
return RequestStatusError, fmt.Errorf("failed to set downstream status source: %w", errors.Join(innerErr, err))
}
}

return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}
Expand Down
Loading