From a2de2cc5c8b275faf00e490a1cad1853bb0169f2 Mon Sep 17 00:00:00 2001 From: Brett Buddin Date: Mon, 16 Dec 2019 23:08:24 -0500 Subject: [PATCH 1/2] fix(kv): Store canceled task runs in the correct bucket. Task runs are stored and retrieved from the `taskRunsv1` bucket, but when they are canceled they are incorrectly placed in the `tasksv1` bucket. Once this has been done, further look ups of the task run fail, because it is located in the wrong bucket. This addresses the problem by placing them back into the `taskRunsv1` bucket. An additional test has been added to ensure we are able to successfully read a canceled run. --- kv/task.go | 2 +- kv/task_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/kv/task.go b/kv/task.go index fe863a94b82..2aa2083bb49 100644 --- a/kv/task.go +++ b/kv/task.go @@ -1051,7 +1051,7 @@ func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.I run.Status = "canceled" // save - bucket, err := tx.Bucket(taskBucket) + bucket, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } diff --git a/kv/task_test.go b/kv/task_test.go index cadb462a063..c6713a58b84 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -370,3 +370,73 @@ func TestService_UpdateTask_InactiveToActive(t *testing.T) { t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String())) } } + +func TestTaskRunCancellation(t *testing.T) { + store, close, err := NewTestBoltStore(t) + if err != nil { + t.Fatal(err) + } + defer close() + + service := kv.NewService(zaptest.NewLogger(t), store) + ctx, cancelFunc := context.WithCancel(context.Background()) + if err := service.Initialize(ctx); err != nil { + t.Fatalf("error initializing urm service: %v", err) + } + defer cancelFunc() + u := &influxdb.User{Name: t.Name() + "-user"} + if err := service.CreateUser(ctx, u); err != nil { + t.Fatal(err) + } + o := &influxdb.Organization{Name: t.Name() + "-org"} + if err := service.CreateOrganization(ctx, o); err != nil { + t.Fatal(err) + } + + if err := service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{ + ResourceType: influxdb.OrgsResourceType, + ResourceID: o.ID, + UserID: u.ID, + UserType: influxdb.Owner, + }); err != nil { + t.Fatal(err) + } + + authz := influxdb.Authorization{ + OrgID: o.ID, + UserID: u.ID, + Permissions: influxdb.OperPermissions(), + } + if err := service.CreateAuthorization(context.Background(), &authz); err != nil { + t.Fatal(err) + } + + ctx = icontext.SetAuthorizer(ctx, &authz) + + task, err := service.CreateTask(ctx, influxdb.TaskCreate{ + Flux: `option task = {name: "a task",cron: "0 * * * *", offset: 20s} from(bucket:"test") |> range(start:-1h)`, + OrganizationID: o.ID, + OwnerID: u.ID, + }) + if err != nil { + t.Fatal(err) + } + + run, err := service.CreateNextRun(ctx, task.ID, time.Now().Add(time.Hour).Unix()) + if err != nil { + t.Fatal(err) + } + + if err := service.CancelRun(ctx, run.Created.TaskID, run.Created.RunID); err != nil { + t.Fatal(err) + } + + canceled, err := service.FindRunByID(ctx, run.Created.TaskID, run.Created.RunID) + if err != nil { + t.Fatal(err) + } + + if canceled.Status != backend.RunCanceled.String() { + t.Fatalf("expected task run to be cancelled") + } +} From 6de01948f3672d048d5408e96cbe20f16e5c5911 Mon Sep 17 00:00:00 2001 From: Brett Buddin Date: Tue, 17 Dec 2019 11:19:40 -0500 Subject: [PATCH 2/2] chore(changelog): Update changelog with task run cancelation fix. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c7f81c877b..9c50a572afb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Bug Fixes 1. [16235](https://github.com/influxdata/influxdb/pull/16235): Removed default frontend sorting when flux queries specify sorting +1. [16238](https://github.com/influxdata/influxdb/pull/16238): Store canceled task runs in the correct bucket ### UI Improvements