Skip to content

Commit

Permalink
Add errorsource to errors (#449)
Browse files Browse the repository at this point in the history
Co-authored-by: Nathan Vērzemnieks <njvrzm@gmail.com>
  • Loading branch information
iwysiu and njvrzm authored Sep 9, 2024
1 parent 4414861 commit dde12ed
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 136 deletions.
6 changes: 4 additions & 2 deletions cspell.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@
"mainstat",
"secondarystat",
"x-ndjson",
"Xtorm",
"Xtorm",
"syslogd",
"gofmt"
"gofmt",
"errorsource",
"exphttpclient"
]
}
6 changes: 4 additions & 2 deletions pkg/opensearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
exphttpclient "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource/httpclient"
"github.com/grafana/opensearch-datasource/pkg/tsdb"
)

Expand All @@ -38,14 +39,15 @@ func NewDatasourceHttpClient(ctx context.Context, ds *backend.DataSourceInstance
return nil, fmt.Errorf("error reading settings: %w", err)
}

httpClientProvider := httpclient.NewProvider()
httpClientOptions, err := ds.HTTPClientOptions(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP client options: %w", err)
}
if settings.OauthPassThru {
httpClientOptions.ForwardHTTPHeaders = true
}
// set the default middlewares from httpclient
httpClientOptions.Middlewares = httpclient.DefaultMiddlewares()

if httpClientOptions.SigV4 != nil {
httpClientOptions.SigV4.Service = "es"
Expand All @@ -55,7 +57,7 @@ func NewDatasourceHttpClient(ctx context.Context, ds *backend.DataSourceInstance
httpClientOptions.Middlewares = append(httpClientOptions.Middlewares, sigV4Middleware())
}

httpClient, err := httpClientProvider.New(httpClientOptions)
httpClient, err := exphttpclient.New(httpClientOptions)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP client: %w", err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/opensearch/lucene_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/bitly/go-simplejson"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/opensearch-datasource/pkg/opensearch/client"
"github.com/grafana/opensearch-datasource/pkg/tsdb"
"github.com/grafana/opensearch-datasource/pkg/utils"
Expand Down Expand Up @@ -279,14 +280,17 @@ func (h *luceneHandler) executeQueries(ctx context.Context) (*backend.QueryDataR
return nil, nil
}

response := backend.NewQueryDataResponse()
errRefID := h.queries[0].RefID
req, err := h.ms.Build()
if err != nil {
return nil, err
return errorsource.AddPluginErrorToResponse(errRefID, response, err), nil
}

res, err := h.client.ExecuteMultisearch(ctx, req)
if err != nil {
return nil, err
// We are returning the error containing the source that was added through errorsource.Middleware
return errorsource.AddErrorToResponse(errRefID, response, err), nil
}

rp := newResponseParser(res.Responses, h.queries, res.DebugInfo, h.client.GetConfiguredFields(), h.dsSettings)
Expand Down
52 changes: 21 additions & 31 deletions pkg/opensearch/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package opensearch

import (
"context"
"errors"
"fmt"
"net/http"

"github.com/bitly/go-simplejson"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/opensearch-datasource/pkg/opensearch/client"
)

Expand Down Expand Up @@ -60,25 +60,25 @@ func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.Quer
return nil, err
}

errRefID, err := handleServiceMapPrefetch(ctx, osClient, req)
if err != nil {
return wrapServiceMapPrefetchError(errRefID, err)
response := handleServiceMapPrefetch(ctx, osClient, req)
if response != nil {
return response, nil
}

query := newQueryRequest(osClient, req.Queries, req.PluginContext.DataSourceInstanceSettings)
response, err := wrapError(query.execute(ctx))
response, err = wrapError(query.execute(ctx))
return response, err
}

// handleServiceMapPrefetch inspects the given request, and, if it wants a serviceMap, creates and
// calls the Prefetch query to get the services and operations lists that are required for
// the associated Stats query. It then adds these parameters to the originating query so
// the Stats query can be created later.
func handleServiceMapPrefetch(ctx context.Context, osClient client.Client, req *backend.QueryDataRequest) (string, error) {
// the Stats query can be created later. Returns a response with an error if the request fails.
func handleServiceMapPrefetch(ctx context.Context, osClient client.Client, req *backend.QueryDataRequest) *backend.QueryDataResponse {
for i, query := range req.Queries {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return "", err
return wrapServiceMapPrefetchError(query.RefID, err)
}
queryType := model.Get("queryType").MustString()
luceneQueryType := model.Get("luceneQueryType").MustString()
Expand All @@ -88,7 +88,9 @@ func handleServiceMapPrefetch(ctx context.Context, osClient client.Client, req *
q := newQueryRequest(osClient, []backend.DataQuery{prefetchQuery}, req.PluginContext.DataSourceInstanceSettings)
response, err := q.execute(ctx)
if err != nil {
return query.RefID, err
return wrapServiceMapPrefetchError(query.RefID, err)
} else if response.Responses[query.RefID].Error != nil {
return wrapServiceMapPrefetchError(query.RefID, response.Responses[query.RefID].Error)
}
services, operations := extractParametersFromServiceMapFrames(response)

Expand All @@ -99,41 +101,29 @@ func handleServiceMapPrefetch(ctx context.Context, osClient client.Client, req *
// An error here _should_ be impossible but since services and operations are coming from outside,
// handle it just in case
if err != nil {
return query.RefID, err
return wrapServiceMapPrefetchError(query.RefID, err)
}
req.Queries[i].JSON = newJson
return "", nil
return nil
}
}
return "", nil
return nil
}

func wrapServiceMapPrefetchError(refId string, err error) (*backend.QueryDataResponse, error) {
if refId != "" {
return &backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
refId: {
Error: fmt.Errorf(`Error fetching service map info: %w`, err),
}},
}, nil
func wrapServiceMapPrefetchError(refId string, err error) *backend.QueryDataResponse {
if err != nil {
response := backend.NewQueryDataResponse()
err = errorsource.PluginError(err, false) // keeps downstream source if present
err = fmt.Errorf(`Error fetching service map info: %w`, err)
return errorsource.AddErrorToResponse(refId, response, err)
}
return nil, err
return nil
}

func wrapError(response *backend.QueryDataResponse, err error) (*backend.QueryDataResponse, error) {
var invalidQueryTypeError invalidQueryTypeError
if errors.As(err, &invalidQueryTypeError) {
return &backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
invalidQueryTypeError.refId: {
Error: fmt.Errorf(`%w, expected Lucene or PPL`, err),
}},
}, nil
}
if err != nil {
return response, fmt.Errorf("OpenSearch data source error: %w", err)
}

return response, err
}

Expand Down
63 changes: 23 additions & 40 deletions pkg/opensearch/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,7 @@ import (
)

func Test_wrapError(t *testing.T) {
t.Run("wrapError intercepts an invalidQueryTypeError and returns a data response with a wrapped error", func(t *testing.T) {
wrappedInvalidQueryTypeError := fmt.Errorf("%q is %w",
"wrong queryType",
invalidQueryTypeError{refId: "some ref id"})

actualResponse, err := wrapError(nil, wrappedInvalidQueryTypeError)

assert.NoError(t, err)
assert.Equal(t, &backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
"some ref id": {
Error: fmt.Errorf(`%w, expected Lucene or PPL`, wrappedInvalidQueryTypeError)}},
}, actualResponse)
})

t.Run("wrapError passes on any other type of error and states it's from OpenSearch data source", func(t *testing.T) {
t.Run("wrapError passes on an error and states it's from OpenSearch data source", func(t *testing.T) {
_, err := wrapError(&backend.QueryDataResponse{}, fmt.Errorf("some error"))

assert.Error(t, err)
Expand All @@ -37,24 +22,18 @@ func Test_wrapError(t *testing.T) {
}

func Test_wrapServiceMapPrefetchError(t *testing.T) {
t.Run("wrapServiceMapPrefetchError returns a response if a refId is passed", func(t *testing.T) {
t.Run("wrapServiceMapPrefetchError wraps the error in a response", func(t *testing.T) {
prefetchError := fmt.Errorf("Some prefetch error")
actualResponse, err := wrapServiceMapPrefetchError("some ref id", prefetchError)
actualResponse := wrapServiceMapPrefetchError("some ref id", prefetchError)

assert.NoError(t, err)
assert.Equal(t, &backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
"some ref id": {
Error: fmt.Errorf(`Error fetching service map info: %w`, prefetchError)}},
}, actualResponse)
assert.NotNil(t, actualResponse)
assert.Equal(t, backend.ErrorSourcePlugin, actualResponse.Responses["some ref id"].ErrorSource)
assert.Equal(t, fmt.Sprintf(`Error fetching service map info: %s`, prefetchError), actualResponse.Responses["some ref id"].Error.Error())
})

t.Run("wrapServiceMapPrefetchError passes the error if there is no refId", func(t *testing.T) {
prefetchError := fmt.Errorf("Some prefetch error")
_, err := wrapServiceMapPrefetchError("", prefetchError)

assert.Error(t, err)
assert.Equal(t, "Some prefetch error", err.Error())
t.Run("wrapServiceMapPrefetchError returns nil if error is nil", func(t *testing.T) {
response := wrapServiceMapPrefetchError("", nil)
assert.Nil(t, response)
})
}

Expand Down Expand Up @@ -87,12 +66,13 @@ func TestServiceMapPreFetch(t *testing.T) {
}

testCases := []struct {
name string
queries []tsdbQuery
response *client.MultiSearchResponse
expectedError error
shouldEditQuery bool
expectedQueryJson string
name string
queries []tsdbQuery
response *client.MultiSearchResponse
expectedError error
expectedErrorSource backend.ErrorSource
shouldEditQuery bool
expectedQueryJson string
}{
{
name: "no service map query",
Expand Down Expand Up @@ -145,7 +125,8 @@ func TestServiceMapPreFetch(t *testing.T) {
response: &client.MultiSearchResponse{
Responses: errResponse,
},
expectedError: fmt.Errorf("foo"),
expectedError: fmt.Errorf("Error fetching service map info: foo"),
expectedErrorSource: backend.ErrorSourceDownstream,
},
}

Expand All @@ -156,13 +137,15 @@ func TestServiceMapPreFetch(t *testing.T) {
req := backend.QueryDataRequest{
Queries: createDataQueriesForTests(tc.queries),
}
_, err := handleServiceMapPrefetch(context.Background(), c, &req)
response := handleServiceMapPrefetch(context.Background(), c, &req)
if tc.expectedError != nil {
require.Equal(t, tc.expectedError, err)
require.NotNil(t, response)
require.Equal(t, tc.expectedErrorSource, response.Responses["A"].ErrorSource)
require.Equal(t, tc.expectedError.Error(), response.Responses["A"].Error.Error())
return
}
require.NoError(t, err)

require.Nil(t, response)
if tc.shouldEditQuery {
assert.Equal(t, tc.expectedQueryJson, string(req.Queries[0].JSON))
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/opensearch/ppl_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
client "github.com/grafana/opensearch-datasource/pkg/opensearch/client"
)

Expand Down Expand Up @@ -40,11 +41,12 @@ func (h *pplHandler) executeQueries(ctx context.Context) (*backend.QueryDataResp
for refID, builder := range h.builders {
req, err := builder.Build()
if err != nil {
return nil, err
return errorsource.AddPluginErrorToResponse(refID, result, err), nil
}
res, err := h.client.ExecutePPLQuery(ctx, req)
if err != nil {
return nil, err
// We are returning the error containing the source that was added through errorsource.Middlewares
return errorsource.AddErrorToResponse(refID, result, err), nil
}

query := h.queries[refID]
Expand Down
26 changes: 14 additions & 12 deletions pkg/opensearch/ppl_response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
simplejson "github.com/bitly/go-simplejson"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/opensearch-datasource/pkg/null"
"github.com/grafana/opensearch-datasource/pkg/opensearch/client"
"github.com/grafana/opensearch-datasource/pkg/utils"
Expand Down Expand Up @@ -37,16 +38,14 @@ func (rp *pplResponseParser) parseResponse(configuredFields client.ConfiguredFie
}

if rp.Response.Error != nil {
return &backend.DataResponse{
Error: getErrorFromPPLResponse(rp.Response),
Frames: []*data.Frame{
{
Meta: &data.FrameMeta{
Custom: debugInfo,
},
errResp := errorsource.Response(errorsource.DownstreamError(getErrorFromPPLResponse(rp.Response), false))
errResp.Frames = []*data.Frame{
{
Meta: &data.FrameMeta{
Custom: debugInfo,
},
},
}, nil
}}
return &errResp, nil
}

queryRes := &backend.DataResponse{
Expand Down Expand Up @@ -91,7 +90,8 @@ func (rp *pplResponseParser) parsePPLResponse(queryRes *backend.DataResponse, co
}
ts, err := rp.parseTimestamp(row[fieldIdx], timestampFormat)
if err != nil {
return nil, err
errResp := errorsource.Response(errorsource.PluginError(err, false))
return &errResp, nil
}
value = *utils.NullFloatToNullableTime(ts)
}
Expand Down Expand Up @@ -139,7 +139,8 @@ func (rp *pplResponseParser) parsePPLResponse(queryRes *backend.DataResponse, co
func (rp *pplResponseParser) parseTimeSeries(queryRes *backend.DataResponse) (*backend.DataResponse, error) {
t, err := getTimeSeriesResponseMeta(rp.Response.Schema)
if err != nil {
return nil, err
errResp := errorsource.Response(errorsource.PluginError(err, false))
return &errResp, nil
}

valueName := rp.getSeriesName(t.valueIndex)
Expand All @@ -152,7 +153,8 @@ func (rp *pplResponseParser) parseTimeSeries(queryRes *backend.DataResponse) (*b
for i, datarow := range rp.Response.Datarows {
err := rp.addDatarow(newFrame, i, datarow, t)
if err != nil {
return nil, err
errResp := errorsource.Response(errorsource.PluginError(err, false))
return &errResp, nil
}
}

Expand Down
Loading

0 comments on commit dde12ed

Please sign in to comment.