diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 651e152ddf2..c0848559660 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -292,12 +292,24 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } // Setup the indexes correctly + forceStatus := "" if existing != nil { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index } else { job.CreateIndex = index job.ModifyIndex = index + + if job.IsPeriodic() { + forceStatus = structs.JobStatusRunning + } else { + forceStatus = structs.JobStatusPending + } + } + + // Set the job's status + if err := s.setJobStatus(watcher, txn, job, false, forceStatus); err != nil { + return fmt.Errorf("setting job status failed: %v", err) } // Insert the job @@ -524,11 +536,19 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro watcher.Add(watch.Item{Table: "evals"}) // Do a nested upsert + jobs := make(map[string]string, len(evals)) for _, eval := range evals { watcher.Add(watch.Item{Eval: eval.ID}) if err := s.nestedUpsertEval(txn, index, eval); err != nil { return err } + + jobs[eval.JobID] = "" + } + + // Set the job's status + if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) } txn.Defer(func() { s.watch.notify(watcher) }) @@ -571,6 +591,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e watcher.Add(watch.Item{Table: "evals"}) watcher.Add(watch.Item{Table: "allocs"}) + jobs := make(map[string]string, len(evals)) for _, eval := range evals { existing, err := txn.First("evals", "id", eval) if err != nil { @@ -583,6 +604,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("eval delete failed: %v", err) } watcher.Add(watch.Item{Eval: eval}) + jobs[existing.(*structs.Evaluation).JobID] = "" } for _, alloc := range allocs { @@ -601,6 +623,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e watcher.Add(watch.Item{AllocEval: realAlloc.EvalID}) watcher.Add(watch.Item{AllocJob: realAlloc.JobID}) watcher.Add(watch.Item{AllocNode: realAlloc.NodeID}) + jobs[realAlloc.JobID] = "" } // Update the indexes @@ -611,6 +634,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + if err := s.setJobStatuses(watcher, txn, jobs, true); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -726,6 +754,16 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + forceStatus := "" + if !copyAlloc.TerminalStatus() { + forceStatus = structs.JobStatusRunning + } + jobs := map[string]string{alloc.JobID: forceStatus} + if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -741,6 +779,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er watcher.Add(watch.Item{Table: "allocs"}) // Handle the allocations + jobs := make(map[string]string, len(allocs)) for _, alloc := range allocs { existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -761,6 +800,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("alloc insert failed: %v", err) } + forceStatus := "" + if !alloc.TerminalStatus() { + forceStatus = structs.JobStatusRunning + } + jobs[alloc.JobID] = forceStatus + watcher.Add(watch.Item{Alloc: alloc.ID}) watcher.Add(watch.Item{AllocEval: alloc.EvalID}) watcher.Add(watch.Item{AllocJob: alloc.JobID}) @@ -772,6 +817,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -906,6 +956,94 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { return iter, nil } +// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. +// It takes a map of job IDs to an optional forceStatus string. It returns an +// error if the job doesn't exist or setJobStatus fails. +func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, + jobs map[string]string, evalDelete bool) error { + for job, forceStatus := range jobs { + existing, err := txn.First("jobs", "id", job) + if err != nil { + return fmt.Errorf("job lookup failed: %v", err) + } + + if existing == nil { + continue + } + + if err := s.setJobStatus(watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { + return err + } + } + + return nil +} + +// setJobStatus sets the status of the job by looking up associated evaluations +// and allocations. evalDelete should be set to true if setJobStatus is being +// called because an evaluation is being deleted (potentially because of garbage +// collection). If forceStatus is non-empty, the job's status will be set to the +// passed status. +func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn, + job *structs.Job, evalDelete bool, forceStatus string) error { + + watcher.Add(watch.Item{Table: "jobs"}) + watcher.Add(watch.Item{Job: job.ID}) + + // If forceStatus is set, immediately set the job's status + if forceStatus != "" { + job.Status = forceStatus + return nil + } + + allocs, err := txn.Get("allocs", "job", job.ID) + if err != nil { + return err + } + + // If there is a non-terminal allocation, the job is running. + hasAlloc := false + for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { + hasAlloc = true + if !alloc.(*structs.Allocation).TerminalStatus() { + job.Status = structs.JobStatusRunning + return nil + } + } + + evals, err := txn.Get("evals", "job", job.ID) + if err != nil { + return err + } + + hasEval := false + for eval := evals.Next(); eval != nil; eval = evals.Next() { + hasEval = true + if !eval.(*structs.Evaluation).TerminalStatus() { + job.Status = structs.JobStatusPending + return nil + } + } + + // The job is dead if all the allocations and evals are terminal or if there + // are no evals because of garbage collection. + if evalDelete || hasEval || hasAlloc { + job.Status = structs.JobStatusDead + return nil + } + + // If there are no allocations or evaluations it is a new job. If the job is + // periodic, we mark it as running as it will never have an + // allocation/evaluation against it. + if job.IsPeriodic() { + job.Status = structs.JobStatusRunning + } else { + job.Status = structs.JobStatusPending + } + + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 5f10d7173ee..1aec05395ef 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1748,6 +1748,161 @@ func TestStateStore_RestoreAlloc(t *testing.T) { notify.verify(t) } +func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { + // Create a mock job. + job := mock.Job() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + exp := "foobar" + if err := state.setJobStatus(watcher, txn, job, false, exp); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != exp { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, exp) + } +} + +func TestStateStore_SetJobStatus_NoEvalsOrAllocs(t *testing.T) { + // Create a mock job. + job := mock.Job() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusPending { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + } +} + +func TestStateStore_SetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { + // Create a mock job. + job := mock.PeriodicJob() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusRunning { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + } +} + +func TestStateStore_SetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { + // Create a mock job. + job := mock.Job() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusDead { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + } +} + +func TestStateStore_SetJobStatus_DeadEvalsAndAllocs(t *testing.T) { + state := testStateStore(t) + + // Create a mock job. + job := mock.Job() + job.Status = "" + + // Create a mock alloc that is dead. + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a mock eval that is complete + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusDead { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + } +} + +func TestStateStore_SetJobStatus_RunningAlloc(t *testing.T) { + state := testStateStore(t) + + // Create a mock job. + job := mock.Job() + job.Status = "" + + // Create a mock alloc that is running. + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.DesiredStatus = structs.AllocDesiredStatusRun + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusRunning { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + } +} + +func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { + state := testStateStore(t) + + // Create a mock job. + job := mock.Job() + job.Status = "" + + // Create a mock eval that is pending. + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusPending + if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusPending { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + } +} + func TestStateWatch_watch(t *testing.T) { sw := newStateWatch() notify1 := make(chan struct{}, 1)