Skip to content

Commit 39a8d3c

Browse files
authored
QueryErrorMutator interface (#190)
* remove pgx errors and add query error mutator interface * remove empty test * add retry logic back with tests * add to change log * this is actually a breaking change * this is actually a breaking change * add titles like other change log lines
1 parent c579137 commit 39a8d3c

File tree

7 files changed

+246
-223
lines changed

7 files changed

+246
-223
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [5.0.0]
9+
10+
### Added
11+
- Added QueryErrorMutator (#190)
12+
13+
### Changed
14+
- **Breaking change** IsPGXConnectionError has been removed. Removed PGX v5/PostgreSQL-specific error detection (#190)
15+
816
## [4.2.7]
917

1018
### Changed

datasource.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,17 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
225225
args = argSetter.SetQueryArgs(ctx, headers)
226226
}
227227

228+
var queryErrorMutator QueryErrorMutator
229+
if mutator, ok := ds.driver().(QueryErrorMutator); ok {
230+
queryErrorMutator = mutator
231+
}
232+
228233
// FIXES:
229234
// * Some datasources (snowflake) expire connections or have an authentication token that expires if not used in 1 or 4 hours.
230235
// Because the datasource driver does not include an option for permanent connections, we retry the connection
231236
// if the query fails. NOTE: this does not include some errors like "ErrNoRows"
232237
dbQuery := NewQuery(dbConn.db, dbConn.settings, ds.driver().Converters(), fillMode, ds.rowLimit)
233-
res, err := dbQuery.Run(ctx, q, args...)
238+
res, err := dbQuery.Run(ctx, q, queryErrorMutator, args...)
234239
if err == nil {
235240
return res, nil
236241
}
@@ -256,7 +261,7 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
256261
}
257262

258263
dbQuery := NewQuery(db, dbConn.settings, ds.driver().Converters(), fillMode, ds.rowLimit)
259-
res, err = dbQuery.Run(ctx, q, args...)
264+
res, err = dbQuery.Run(ctx, q, queryErrorMutator, args...)
260265
if err == nil {
261266
return res, err
262267
}
@@ -286,7 +291,7 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
286291
}
287292

288293
dbQuery := NewQuery(db, dbConn.settings, ds.driver().Converters(), fillMode, ds.rowLimit)
289-
res, err = dbQuery.Run(ctx, q, args...)
294+
res, err = dbQuery.Run(ctx, q, queryErrorMutator, args...)
290295
if err == nil {
291296
return res, err
292297
}

driver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ type QueryArgSetter interface {
7373
type ResponseMutator interface {
7474
MutateResponse(ctx context.Context, res data.Frames) (data.Frames, error)
7575
}
76+
77+
type QueryErrorMutator interface {
78+
MutateQueryError(err error) backend.ErrorWithSource
79+
}

errors.go

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package sqlds
22

33
import (
44
"errors"
5-
"strings"
65

76
"github.com/grafana/grafana-plugin-sdk-go/backend"
87
)
@@ -22,8 +21,6 @@ var (
2221
ErrorRowValidation = errors.New("SQL rows validation failed")
2322
// ErrorConnectionClosed is returned when the database connection is unexpectedly closed
2423
ErrorConnectionClosed = errors.New("database connection closed")
25-
// ErrorPGXLifecycle is returned for PGX v5 specific connection lifecycle issues
26-
ErrorPGXLifecycle = errors.New("PGX connection lifecycle error")
2724
)
2825

2926
func ErrorSource(err error) backend.ErrorSource {
@@ -32,57 +29,3 @@ func ErrorSource(err error) backend.ErrorSource {
3229
}
3330
return backend.ErrorSourcePlugin
3431
}
35-
36-
// IsPGXConnectionError checks if an error is related to PGX v5 connection issues
37-
func IsPGXConnectionError(err error) bool {
38-
if err == nil {
39-
return false
40-
}
41-
42-
errStr := strings.ToLower(err.Error())
43-
pgxConnectionErrors := []string{
44-
"connection closed",
45-
"connection reset",
46-
"connection refused",
47-
"broken pipe",
48-
"eof",
49-
"context canceled",
50-
"context deadline exceeded",
51-
"pgconn",
52-
"conn is closed",
53-
"bad connection",
54-
}
55-
56-
for _, pgxErr := range pgxConnectionErrors {
57-
if strings.Contains(errStr, pgxErr) {
58-
return true
59-
}
60-
}
61-
62-
return false
63-
}
64-
65-
// ClassifyError determines the appropriate error source and type for SQL errors
66-
func ClassifyError(err error) (backend.ErrorSource, error) {
67-
if err == nil {
68-
return backend.ErrorSourcePlugin, nil
69-
}
70-
71-
// Check for PGX v5 specific connection errors
72-
if IsPGXConnectionError(err) {
73-
// These are typically downstream connection issues
74-
return backend.ErrorSourceDownstream, ErrorPGXLifecycle
75-
}
76-
77-
// Check for row validation errors
78-
if errors.Is(err, ErrorRowValidation) {
79-
return backend.ErrorSourceDownstream, err
80-
}
81-
82-
// Default to existing logic
83-
if backend.IsDownstreamError(err) {
84-
return backend.ErrorSourceDownstream, err
85-
}
86-
87-
return backend.ErrorSourcePlugin, err
88-
}

query.go

Lines changed: 44 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -74,55 +74,51 @@ func NewQuery(db Connection, settings backend.DataSourceInstanceSettings, conver
7474
}
7575

7676
// Run sends the query to the connection and converts the rows to a dataframe.
77-
func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (data.Frames, error) {
77+
func (q *DBQuery) Run(ctx context.Context, query *Query, queryErrorMutator QueryErrorMutator, args ...interface{}) (data.Frames, error) {
7878
start := time.Now()
7979
rows, err := q.DB.QueryContext(ctx, query.RawSQL, args...)
8080
if err != nil {
81-
// Determine error source based on retry configuration and error type
82-
errSource := backend.ErrorSourcePlugin
83-
errType := ErrorQuery
81+
var errWithSource backend.ErrorWithSource
82+
defer func() {
83+
q.metrics.CollectDuration(Source(errWithSource.ErrorSource()), StatusError, time.Since(start).Seconds())
84+
}()
8485

8586
if errors.Is(err, context.Canceled) {
86-
errType = context.Canceled
87-
errSource = backend.ErrorSourceDownstream
88-
} else if IsPGXConnectionError(err) {
89-
errType = ErrorPGXLifecycle
90-
errSource = backend.ErrorSourceDownstream
91-
} else {
92-
// Use enhanced error classification for PGX v5
93-
errSource, _ = ClassifyError(err)
87+
errWithSource := backend.NewErrorWithSource(err, backend.ErrorSourceDownstream)
88+
return sqlutil.ErrorFrameFromQuery(query), errWithSource
9489
}
9590

96-
var errWithSource error
97-
if errSource == backend.ErrorSourceDownstream {
98-
errWithSource = backend.DownstreamError(fmt.Errorf("%w: %s", errType, err.Error()))
99-
} else {
100-
errWithSource = backend.PluginError(fmt.Errorf("%w: %s", errType, err.Error()))
91+
// Wrap with ErrorQuery to enable retry logic in datasource
92+
queryErr := fmt.Errorf("%w: %w", ErrorQuery, err)
93+
94+
// Handle driver specific errors
95+
if queryErrorMutator != nil {
96+
errWithSource = queryErrorMutator.MutateQueryError(queryErr)
97+
return sqlutil.ErrorFrameFromQuery(query), errWithSource
10198
}
10299

103-
q.metrics.CollectDuration(Source(errSource), StatusError, time.Since(start).Seconds())
100+
// If we get to this point, assume the error is from the plugin
101+
errWithSource = backend.NewErrorWithSource(queryErr, backend.DefaultErrorSource)
102+
104103
return sqlutil.ErrorFrameFromQuery(query), errWithSource
105104
}
106105
q.metrics.CollectDuration(SourceDownstream, StatusOK, time.Since(start).Seconds())
107106

108107
// Check for an error response
109108
if err := rows.Err(); err != nil {
109+
queryErr := fmt.Errorf("%w: %w", ErrorQuery, err)
110+
errWithSource := backend.NewErrorWithSource(queryErr, backend.DefaultErrorSource)
110111
if errors.Is(err, sql.ErrNoRows) {
111112
// Should we even response with an error here?
112113
// The panel will simply show "no data"
113-
errWithSource := backend.DownstreamError(fmt.Errorf("%s: %w", "No results from query", err))
114+
errWithSource = backend.NewErrorWithSource(fmt.Errorf("%w: %s", err, "Error response from database"), backend.ErrorSourceDownstream)
114115
return sqlutil.ErrorFrameFromQuery(query), errWithSource
115116
}
116-
117-
errSource, _ := ClassifyError(err)
118-
var errWithSource error
119-
if errSource == backend.ErrorSourceDownstream {
120-
errWithSource = backend.DownstreamError(fmt.Errorf("%s: %w", "Error response from database", err))
121-
} else {
122-
errWithSource = backend.PluginError(fmt.Errorf("%s: %w", "Error response from database", err))
117+
if queryErrorMutator != nil {
118+
errWithSource = queryErrorMutator.MutateQueryError(queryErr)
123119
}
124120

125-
q.metrics.CollectDuration(Source(errSource), StatusError, time.Since(start).Seconds())
121+
q.metrics.CollectDuration(Source(errWithSource.ErrorSource()), StatusError, time.Since(start).Seconds())
126122
return sqlutil.ErrorFrameFromQuery(query), errWithSource
127123
}
128124

@@ -132,23 +128,34 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
132128
}
133129
}()
134130

135-
start = time.Now()
136-
// Convert the response to frames
131+
return q.convertRowsToFrames(rows, query, queryErrorMutator)
132+
}
133+
134+
func (q *DBQuery) convertRowsToFrames(rows *sql.Rows, query *Query, queryErrorMutator QueryErrorMutator) (data.Frames, error) {
135+
source := SourcePlugin
136+
status := StatusOK
137+
start := time.Now()
138+
defer func() {
139+
q.metrics.CollectDuration(source, status, time.Since(start).Seconds())
140+
}()
141+
137142
res, err := getFrames(rows, q.rowLimit, q.converters, q.fillMode, query)
138143
if err != nil {
139-
errSource, _ := ClassifyError(err)
144+
status = StatusError
140145

141146
// Additional checks for processing errors
142-
if backend.IsDownstreamHTTPError(err) || isProcessingDownstreamError(err) {
143-
errSource = backend.ErrorSourceDownstream
147+
if backend.IsDownstreamHTTPError(err) {
148+
source = SourceDownstream
149+
} else if queryErrorMutator != nil {
150+
errWithSource := queryErrorMutator.MutateQueryError(err)
151+
source = Source(errWithSource.ErrorSource())
144152
}
145153

146-
errWithSource := backend.NewErrorWithSource(fmt.Errorf("%w: %s", err, "Could not process SQL results"), errSource)
147-
q.metrics.CollectDuration(Source(errSource), StatusError, time.Since(start).Seconds())
148-
return sqlutil.ErrorFrameFromQuery(query), errWithSource
154+
return sqlutil.ErrorFrameFromQuery(query), backend.NewErrorWithSource(
155+
fmt.Errorf("%w: %s", err, "Could not process SQL results"),
156+
backend.ErrorSource(source),
157+
)
149158
}
150-
151-
q.metrics.CollectDuration(SourcePlugin, StatusOK, time.Since(start).Seconds())
152159
return res, nil
153160
}
154161

@@ -306,26 +313,3 @@ func applyHeaders(query *Query, headers http.Header) *Query {
306313

307314
return query
308315
}
309-
310-
func isProcessingDownstreamError(err error) bool {
311-
downstreamErrors := []error{
312-
data.ErrorInputFieldsWithoutRows,
313-
data.ErrorSeriesUnsorted,
314-
data.ErrorNullTimeValues,
315-
ErrorRowValidation,
316-
ErrorConnectionClosed,
317-
ErrorPGXLifecycle,
318-
}
319-
for _, e := range downstreamErrors {
320-
if errors.Is(err, e) {
321-
return true
322-
}
323-
}
324-
325-
// Check for PGX connection errors
326-
if IsPGXConnectionError(err) {
327-
return true
328-
}
329-
330-
return false
331-
}

query_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestQuery_MySQL(t *testing.T) {
8080
}
8181

8282
sqlQuery := NewQuery(db, settings, []sqlutil.Converter{}, nil, defaultRowLimit)
83-
_, err := sqlQuery.Run(ctx, q)
83+
_, err := sqlQuery.Run(ctx, q, nil)
8484
if err == nil {
8585
t.Fatal("expected an error but received none")
8686
}

0 commit comments

Comments
 (0)