diff --git a/CHANGELOG.md b/CHANGELOG.md index 12a388e4b8a..ec276e21dd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## Unreleased + +### Bug Fixes + +1. [14287](https://github.com/influxdata/influxdb/pull/14287) Fix incorrect reporting of task as successful when error occurs during result iteration + ## v2.0.0-alpha.14 [2019-06-28] ### Features diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index b65a4ec1779..6880e3110f7 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -180,7 +180,7 @@ func (p *syncRunPromise) doQuery(wg *sync.WaitGroup) { for it.More() { // Consume the full iterator so that we don't leak outstanding iterators. res := it.Next() - if err := exhaustResultIterators(res); err != nil { + if err = exhaustResultIterators(res); err != nil { p.logger.Info("Error exhausting result iterator", zap.Error(err), zap.String("name", res.Name())) } } @@ -189,8 +189,12 @@ func (p *syncRunPromise) doQuery(wg *sync.WaitGroup) { // It's safe for Release to be called multiple times. it.Release() + if err == nil { + err = it.Err() + } + // Is it okay to assume it.Err will be set if the query context is canceled? - p.finish(&runResult{err: it.Err(), statistics: it.Statistics()}, nil) + p.finish(&runResult{err: err, statistics: it.Statistics()}, nil) } func (p *syncRunPromise) cancelOnContextDone(wg *sync.WaitGroup) { diff --git a/task/backend/executor/task_executor.go b/task/backend/executor/task_executor.go index 846c613d206..7574a51d887 100644 --- a/task/backend/executor/task_executor.go +++ b/task/backend/executor/task_executor.go @@ -245,11 +245,15 @@ type workerMaker struct { } func (wm *workerMaker) new() interface{} { - return &worker{wm.te} + return &worker{wm.te, exhaustResultIterators} } type worker struct { te *TaskExecutor + + // exhaustResultIterators is used to exhaust the result + // of a flux query + exhaustResultIterators func(res flux.Result) error } func (w *worker) work() { @@ -376,17 +380,22 @@ func (w *worker) executeQuery(p *Promise) { return } + var runErr error // Drain the result iterator. for it.More() { // Consume the full iterator so that we don't leak outstanding iterators. res := it.Next() - if err := exhaustResultIterators(res); err != nil { - w.te.logger.Info("Error exhausting result iterator", zap.Error(err), zap.String("name", res.Name())) + if runErr = w.exhaustResultIterators(res); runErr != nil { + w.te.logger.Info("Error exhausting result iterator", zap.Error(runErr), zap.String("name", res.Name())) } } it.Release() + if runErr == nil { + runErr = it.Err() + } + // log the statistics on the run stats := it.Statistics() @@ -395,7 +404,7 @@ func (w *worker) executeQuery(p *Promise) { w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), string(b)) } - w.finish(p, backend.RunSuccess, it.Err()) + w.finish(p, backend.RunSuccess, runErr) } // Promise represents a promise the executor makes to finish a run's execution asynchronously. diff --git a/task/backend/executor/task_executor_test.go b/task/backend/executor/task_executor_test.go index f9e37607e97..47022289435 100644 --- a/task/backend/executor/task_executor_test.go +++ b/task/backend/executor/task_executor_test.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" + "sync" "testing" "time" + "github.com/influxdata/flux" "github.com/influxdata/influxdb" platform "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" @@ -57,6 +59,7 @@ func TestTaskExecutor(t *testing.T) { t.Run("WorkerLimit", testWorkerLimit) t.Run("LimitFunc", testLimitFunc) t.Run("Metrics", testMetrics) + t.Run("IteratorFailure", testIteratorFailure) } func testQuerySuccess(t *testing.T) { @@ -358,3 +361,46 @@ func testMetrics(t *testing.T) { t.Fatal(got) } } + +func testIteratorFailure(t *testing.T) { + t.Parallel() + tes := taskExecutorSystem(t) + + // replace iterator exhaust function with one which errors + tes.ex.workerPool = sync.Pool{New: func() interface{} { + return &worker{tes.ex, func(flux.Result) error { + return errors.New("something went wrong exhausting iterator") + }} + }} + + script := fmt.Sprintf(fmtTestScript, t.Name()) + ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + if err != nil { + t.Fatal(err) + } + + promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + if err != nil { + t.Fatal(err) + } + promiseID := influxdb.ID(promise.ID()) + + run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID) + if err != nil { + t.Fatal(err) + } + + if run.ID != promiseID { + t.Fatal("promise and run dont match") + } + + tes.svc.WaitForQueryLive(t, script) + tes.svc.SucceedQuery(script) + + <-promise.Done() + + if got := promise.Error(); got == nil { + t.Fatal("got no error when I should have") + } +}