Skip to content

Commit affb0fb

Browse files
committed
fix(kv): Store canceled task runs in the correct bucket.
Task runs are stored and retrieved from the `taskRunsv1` bucket, but when they are canceled they are incorrectly placed in the `tasksv1` bucket. This addresses the problem by placing them back into the `taskRunsv1` bucket. An additional test has been added to ensure we are able to successfully read a canceled run.
1 parent fc9df92 commit affb0fb

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

kv/task.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1051,7 +1051,7 @@ func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.I
10511051
run.Status = "canceled"
10521052

10531053
// save
1054-
bucket, err := tx.Bucket(taskBucket)
1054+
bucket, err := tx.Bucket(taskRunBucket)
10551055
if err != nil {
10561056
return influxdb.ErrUnexpectedTaskBucketErr(err)
10571057
}

kv/task_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -370,3 +370,73 @@ func TestService_UpdateTask_InactiveToActive(t *testing.T) {
370370
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
371371
}
372372
}
373+
374+
func TestTaskRunCancellation(t *testing.T) {
375+
store, close, err := NewTestBoltStore(t)
376+
if err != nil {
377+
t.Fatal(err)
378+
}
379+
defer close()
380+
381+
service := kv.NewService(zaptest.NewLogger(t), store)
382+
ctx, cancelFunc := context.WithCancel(context.Background())
383+
if err := service.Initialize(ctx); err != nil {
384+
t.Fatalf("error initializing urm service: %v", err)
385+
}
386+
defer cancelFunc()
387+
u := &influxdb.User{Name: t.Name() + "-user"}
388+
if err := service.CreateUser(ctx, u); err != nil {
389+
t.Fatal(err)
390+
}
391+
o := &influxdb.Organization{Name: t.Name() + "-org"}
392+
if err := service.CreateOrganization(ctx, o); err != nil {
393+
t.Fatal(err)
394+
}
395+
396+
if err := service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
397+
ResourceType: influxdb.OrgsResourceType,
398+
ResourceID: o.ID,
399+
UserID: u.ID,
400+
UserType: influxdb.Owner,
401+
}); err != nil {
402+
t.Fatal(err)
403+
}
404+
405+
authz := influxdb.Authorization{
406+
OrgID: o.ID,
407+
UserID: u.ID,
408+
Permissions: influxdb.OperPermissions(),
409+
}
410+
if err := service.CreateAuthorization(context.Background(), &authz); err != nil {
411+
t.Fatal(err)
412+
}
413+
414+
ctx = icontext.SetAuthorizer(ctx, &authz)
415+
416+
task, err := service.CreateTask(ctx, influxdb.TaskCreate{
417+
Flux: `option task = {name: "a task",cron: "0 * * * *", offset: 20s} from(bucket:"test") |> range(start:-1h)`,
418+
OrganizationID: o.ID,
419+
OwnerID: u.ID,
420+
})
421+
if err != nil {
422+
t.Fatal(err)
423+
}
424+
425+
run, err := service.CreateNextRun(ctx, task.ID, time.Now().Add(time.Hour).Unix())
426+
if err != nil {
427+
t.Fatal(err)
428+
}
429+
430+
if err := service.CancelRun(ctx, run.Created.TaskID, run.Created.RunID); err != nil {
431+
t.Fatal(err)
432+
}
433+
434+
canceled, err := service.FindRunByID(ctx, run.Created.TaskID, run.Created.RunID)
435+
if err != nil {
436+
t.Fatal(err)
437+
}
438+
439+
if canceled.Status != backend.RunCanceled.String() {
440+
t.Fatalf("expected task run to be cancelled")
441+
}
442+
}

0 commit comments

Comments
 (0)