@@ -78,12 +78,29 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
7878 start := time .Now ()
7979 rows , err := q .DB .QueryContext (ctx , query .RawSQL , args ... )
8080 if err != nil {
81+ // Determine error source based on retry configuration and error type
82+ errSource := backend .ErrorSourcePlugin
8183 errType := ErrorQuery
84+
8285 if errors .Is (err , context .Canceled ) {
8386 errType = context .Canceled
87+ errSource = backend .ErrorSourcePlugin
88+ } else if IsPGXConnectionError (err ) {
89+ errType = ErrorPGXLifecycle
90+ errSource = backend .ErrorSourceDownstream
91+ } else {
92+ // Use enhanced error classification for PGX v5
93+ errSource , _ = ClassifyError (err )
94+ }
95+
96+ var errWithSource error
97+ if errSource == backend .ErrorSourceDownstream {
98+ errWithSource = backend .DownstreamError (fmt .Errorf ("%w: %s" , errType , err .Error ()))
99+ } else {
100+ errWithSource = backend .PluginError (fmt .Errorf ("%w: %s" , errType , err .Error ()))
84101 }
85- errWithSource := backend . DownstreamError ( fmt . Errorf ( "%w: %s" , errType , err . Error ()))
86- q .metrics .CollectDuration (SourceDownstream , StatusError , time .Since (start ).Seconds ())
102+
103+ q .metrics .CollectDuration (Source ( errSource ) , StatusError , time .Since (start ).Seconds ())
87104 return sqlutil .ErrorFrameFromQuery (query ), errWithSource
88105 }
89106 q .metrics .CollectDuration (SourceDownstream , StatusOK , time .Since (start ).Seconds ())
@@ -96,8 +113,16 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
96113 errWithSource := backend .DownstreamError (fmt .Errorf ("%s: %w" , "No results from query" , err ))
97114 return sqlutil .ErrorFrameFromQuery (query ), errWithSource
98115 }
99- errWithSource := backend .DownstreamError (fmt .Errorf ("%s: %w" , "Error response from database" , err ))
100- q .metrics .CollectDuration (SourceDownstream , StatusError , time .Since (start ).Seconds ())
116+
117+ errSource , _ := ClassifyError (err )
118+ var errWithSource error
119+ if errSource == backend .ErrorSourceDownstream {
120+ errWithSource = backend .DownstreamError (fmt .Errorf ("%s: %w" , "Error response from database" , err ))
121+ } else {
122+ errWithSource = backend .PluginError (fmt .Errorf ("%s: %w" , "Error response from database" , err ))
123+ }
124+
125+ q .metrics .CollectDuration (Source (errSource ), StatusError , time .Since (start ).Seconds ())
101126 return sqlutil .ErrorFrameFromQuery (query ), errWithSource
102127 }
103128
@@ -111,11 +136,13 @@ func (q *DBQuery) Run(ctx context.Context, query *Query, args ...interface{}) (d
111136 // Convert the response to frames
112137 res , err := getFrames (rows , q .rowLimit , q .converters , q .fillMode , query )
113138 if err != nil {
114- // We default to plugin error source
115- errSource := backend .ErrorSourcePlugin
139+ errSource , _ := ClassifyError (err )
140+
141+ // Additional checks for processing errors
116142 if backend .IsDownstreamHTTPError (err ) || isProcessingDownstreamError (err ) {
117143 errSource = backend .ErrorSourceDownstream
118144 }
145+
119146 errWithSource := backend .NewErrorWithSource (fmt .Errorf ("%w: %s" , err , "Could not process SQL results" ), errSource )
120147 q .metrics .CollectDuration (Source (errSource ), StatusError , time .Since (start ).Seconds ())
121148 return sqlutil .ErrorFrameFromQuery (query ), errWithSource
@@ -212,12 +239,12 @@ func accessColumns(rows *sql.Rows) (columnErr error) {
212239// validateRows performs safety checks on SQL rows to prevent panics
213240func validateRows (rows * sql.Rows ) error {
214241 if rows == nil {
215- return fmt .Errorf ("rows is nil" )
242+ return fmt .Errorf ("%w: rows is nil" , ErrorRowValidation )
216243 }
217244
218245 err := accessColumns (rows )
219246 if err != nil {
220- return fmt .Errorf ("failed to validate rows : %w" , err )
247+ return fmt .Errorf ("%w : %w" , ErrorRowValidation , err )
221248 }
222249 return nil
223250}
@@ -285,11 +312,25 @@ func isProcessingDownstreamError(err error) bool {
285312 data .ErrorInputFieldsWithoutRows ,
286313 data .ErrorSeriesUnsorted ,
287314 data .ErrorNullTimeValues ,
315+ ErrorRowValidation ,
316+ ErrorConnectionClosed ,
317+ ErrorPGXLifecycle ,
288318 }
289319 for _ , e := range downstreamErrors {
290320 if errors .Is (err , e ) {
291321 return true
292322 }
293323 }
324+
325+ // Check for generic downstream errors
326+ if IsGenericDownstreamError (err ) {
327+ return true
328+ }
329+
330+ // Check for PGX connection errors
331+ if IsPGXConnectionError (err ) {
332+ return true
333+ }
334+
294335 return false
295336}
0 commit comments