From 815761b07d1fed4cc65fdbed9ed6f8bc95868e79 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 7 Jan 2016 20:08:14 -0800 Subject: [PATCH 1/4] Update JobStatus's --- nomad/structs/structs.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7970f43fe65..bbb171f1de9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -689,10 +689,9 @@ const ( ) const ( - JobStatusPending = "pending" // Pending means the job is waiting on scheduling - JobStatusRunning = "running" // Running means the entire job is running - JobStatusComplete = "complete" // Complete means there was a clean termination - JobStatusDead = "dead" // Dead means there was abnormal termination + JobStatusPending = "pending" // Pending means the job is waiting on scheduling + JobStatusRunning = "running" // Running means the job has non-terminal allocations + JobStatusDead = "dead" // Dead means all evaluation's and allocations are terminal ) const ( From 894b3e3bd4d4a8e6aa16fd3699724508158efd59 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 8 Jan 2016 18:22:59 -0800 Subject: [PATCH 2/4] Set job status in state store --- nomad/state/state_store.go | 138 ++++++++++++++++++++++++++++ nomad/state/state_store_test.go | 155 ++++++++++++++++++++++++++++++++ 2 files changed, 293 insertions(+) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 651e152ddf2..c0848559660 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -292,12 +292,24 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } // Setup the indexes correctly + forceStatus := "" if existing != nil { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index } else { job.CreateIndex = index job.ModifyIndex = index + + if job.IsPeriodic() { + forceStatus = structs.JobStatusRunning + } else { + forceStatus = structs.JobStatusPending + } + } + + // Set the job's status + if err := s.setJobStatus(watcher, txn, job, false, forceStatus); err != nil { + return fmt.Errorf("setting job status failed: %v", err) } // Insert the job @@ -524,11 +536,19 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro watcher.Add(watch.Item{Table: "evals"}) // Do a nested upsert + jobs := make(map[string]string, len(evals)) for _, eval := range evals { watcher.Add(watch.Item{Eval: eval.ID}) if err := s.nestedUpsertEval(txn, index, eval); err != nil { return err } + + jobs[eval.JobID] = "" + } + + // Set the job's status + if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) } txn.Defer(func() { s.watch.notify(watcher) }) @@ -571,6 +591,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e watcher.Add(watch.Item{Table: "evals"}) watcher.Add(watch.Item{Table: "allocs"}) + jobs := make(map[string]string, len(evals)) for _, eval := range evals { existing, err := txn.First("evals", "id", eval) if err != nil { @@ -583,6 +604,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("eval delete failed: %v", err) } watcher.Add(watch.Item{Eval: eval}) + jobs[existing.(*structs.Evaluation).JobID] = "" } for _, alloc := range allocs { @@ -601,6 +623,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e watcher.Add(watch.Item{AllocEval: realAlloc.EvalID}) watcher.Add(watch.Item{AllocJob: realAlloc.JobID}) watcher.Add(watch.Item{AllocNode: realAlloc.NodeID}) + jobs[realAlloc.JobID] = "" } // Update the indexes @@ -611,6 +634,11 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + if err := s.setJobStatuses(watcher, txn, jobs, true); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -726,6 +754,16 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + forceStatus := "" + if !copyAlloc.TerminalStatus() { + forceStatus = structs.JobStatusRunning + } + jobs := map[string]string{alloc.JobID: forceStatus} + if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -741,6 +779,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er watcher.Add(watch.Item{Table: "allocs"}) // Handle the allocations + jobs := make(map[string]string, len(allocs)) for _, alloc := range allocs { existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -761,6 +800,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("alloc insert failed: %v", err) } + forceStatus := "" + if !alloc.TerminalStatus() { + forceStatus = structs.JobStatusRunning + } + jobs[alloc.JobID] = forceStatus + watcher.Add(watch.Item{Alloc: alloc.ID}) watcher.Add(watch.Item{AllocEval: alloc.EvalID}) watcher.Add(watch.Item{AllocJob: alloc.JobID}) @@ -772,6 +817,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("index update failed: %v", err) } + // Set the job's status + if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + return fmt.Errorf("setting job status failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -906,6 +956,94 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { return iter, nil } +// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. +// It takes a map of job IDs to an optional forceStatus string. It returns an +// error if the job doesn't exist or setJobStatus fails. +func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, + jobs map[string]string, evalDelete bool) error { + for job, forceStatus := range jobs { + existing, err := txn.First("jobs", "id", job) + if err != nil { + return fmt.Errorf("job lookup failed: %v", err) + } + + if existing == nil { + continue + } + + if err := s.setJobStatus(watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { + return err + } + } + + return nil +} + +// setJobStatus sets the status of the job by looking up associated evaluations +// and allocations. evalDelete should be set to true if setJobStatus is being +// called because an evaluation is being deleted (potentially because of garbage +// collection). If forceStatus is non-empty, the job's status will be set to the +// passed status. +func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn, + job *structs.Job, evalDelete bool, forceStatus string) error { + + watcher.Add(watch.Item{Table: "jobs"}) + watcher.Add(watch.Item{Job: job.ID}) + + // If forceStatus is set, immediately set the job's status + if forceStatus != "" { + job.Status = forceStatus + return nil + } + + allocs, err := txn.Get("allocs", "job", job.ID) + if err != nil { + return err + } + + // If there is a non-terminal allocation, the job is running. + hasAlloc := false + for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { + hasAlloc = true + if !alloc.(*structs.Allocation).TerminalStatus() { + job.Status = structs.JobStatusRunning + return nil + } + } + + evals, err := txn.Get("evals", "job", job.ID) + if err != nil { + return err + } + + hasEval := false + for eval := evals.Next(); eval != nil; eval = evals.Next() { + hasEval = true + if !eval.(*structs.Evaluation).TerminalStatus() { + job.Status = structs.JobStatusPending + return nil + } + } + + // The job is dead if all the allocations and evals are terminal or if there + // are no evals because of garbage collection. + if evalDelete || hasEval || hasAlloc { + job.Status = structs.JobStatusDead + return nil + } + + // If there are no allocations or evaluations it is a new job. If the job is + // periodic, we mark it as running as it will never have an + // allocation/evaluation against it. + if job.IsPeriodic() { + job.Status = structs.JobStatusRunning + } else { + job.Status = structs.JobStatusPending + } + + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 5f10d7173ee..1aec05395ef 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1748,6 +1748,161 @@ func TestStateStore_RestoreAlloc(t *testing.T) { notify.verify(t) } +func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { + // Create a mock job. + job := mock.Job() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + exp := "foobar" + if err := state.setJobStatus(watcher, txn, job, false, exp); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != exp { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, exp) + } +} + +func TestStateStore_SetJobStatus_NoEvalsOrAllocs(t *testing.T) { + // Create a mock job. + job := mock.Job() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusPending { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + } +} + +func TestStateStore_SetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { + // Create a mock job. + job := mock.PeriodicJob() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusRunning { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + } +} + +func TestStateStore_SetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { + // Create a mock job. + job := mock.Job() + job.Status = "" + + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusDead { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + } +} + +func TestStateStore_SetJobStatus_DeadEvalsAndAllocs(t *testing.T) { + state := testStateStore(t) + + // Create a mock job. + job := mock.Job() + job.Status = "" + + // Create a mock alloc that is dead. + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a mock eval that is complete + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusDead { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + } +} + +func TestStateStore_SetJobStatus_RunningAlloc(t *testing.T) { + state := testStateStore(t) + + // Create a mock job. + job := mock.Job() + job.Status = "" + + // Create a mock alloc that is running. + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.DesiredStatus = structs.AllocDesiredStatusRun + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusRunning { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + } +} + +func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { + state := testStateStore(t) + + // Create a mock job. + job := mock.Job() + job.Status = "" + + // Create a mock eval that is pending. + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusPending + if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + watcher := watch.NewItems() + txn := state.db.Txn(false) + if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { + t.Fatalf("setJobStatus() failed: %v", err) + } + + if job.Status != structs.JobStatusPending { + t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + } +} + func TestStateWatch_watch(t *testing.T) { sw := newStateWatch() notify1 := make(chan struct{}, 1) From 875bf476879c5d6380239de5a539c18aac16e848 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 11 Jan 2016 17:34:25 -0800 Subject: [PATCH 3/4] Address comments --- nomad/state/state_store.go | 94 +++++++++++------ nomad/state/state_store_test.go | 177 +++++++++++++++++++++----------- 2 files changed, 179 insertions(+), 92 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c0848559660..338fab7fea5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -292,26 +292,29 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } // Setup the indexes correctly - forceStatus := "" if existing != nil { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index + + // Compute the job status + var err error + job.Status, err = s.getJobStatus(txn, job, false) + if err != nil { + return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) + } } else { job.CreateIndex = index job.ModifyIndex = index + // If we are inserting the job for the first time, we don't need to + // calculate the jobs status as it is known. if job.IsPeriodic() { - forceStatus = structs.JobStatusRunning + job.Status = structs.JobStatusRunning } else { - forceStatus = structs.JobStatusPending + job.Status = structs.JobStatusPending } } - // Set the job's status - if err := s.setJobStatus(watcher, txn, job, false, forceStatus); err != nil { - return fmt.Errorf("setting job status failed: %v", err) - } - // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -547,7 +550,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro } // Set the job's status - if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -623,7 +626,6 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e watcher.Add(watch.Item{AllocEval: realAlloc.EvalID}) watcher.Add(watch.Item{AllocJob: realAlloc.JobID}) watcher.Add(watch.Item{AllocNode: realAlloc.NodeID}) - jobs[realAlloc.JobID] = "" } // Update the indexes @@ -635,7 +637,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e } // Set the job's status - if err := s.setJobStatuses(watcher, txn, jobs, true); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, true); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -760,7 +762,7 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati forceStatus = structs.JobStatusRunning } jobs := map[string]string{alloc.JobID: forceStatus} - if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -779,7 +781,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er watcher.Add(watch.Item{Table: "allocs"}) // Handle the allocations - jobs := make(map[string]string, len(allocs)) + jobs := make(map[string]string, 1) for _, alloc := range allocs { existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -800,6 +802,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("alloc insert failed: %v", err) } + // If the allocation is running, force the job to running status. forceStatus := "" if !alloc.TerminalStatus() { forceStatus = structs.JobStatusRunning @@ -818,7 +821,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er } // Set the job's status - if err := s.setJobStatuses(watcher, txn, jobs, false); err != nil { + if err := s.setJobStatuses(index, watcher, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } @@ -959,7 +962,7 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { // setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. // It takes a map of job IDs to an optional forceStatus string. It returns an // error if the job doesn't exist or setJobStatus fails. -func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, +func (s *StateStore) setJobStatuses(index uint64, watcher watch.Items, txn *memdb.Txn, jobs map[string]string, evalDelete bool) error { for job, forceStatus := range jobs { existing, err := txn.First("jobs", "id", job) @@ -971,7 +974,7 @@ func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, continue } - if err := s.setJobStatus(watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { + if err := s.setJobStatus(index, watcher, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { return err } } @@ -984,21 +987,50 @@ func (s *StateStore) setJobStatuses(watcher watch.Items, txn *memdb.Txn, // called because an evaluation is being deleted (potentially because of garbage // collection). If forceStatus is non-empty, the job's status will be set to the // passed status. -func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn, +func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.Txn, job *structs.Job, evalDelete bool, forceStatus string) error { + // Capture the current status so we can check if there is a change + oldStatus := job.Status + newStatus := forceStatus + + // If forceStatus is not set, compute the jobs status. + if forceStatus == "" { + var err error + newStatus, err = s.getJobStatus(txn, job, evalDelete) + if err != nil { + return err + } + } + + // Fast-path if nothing has changed. + if oldStatus == newStatus { + return nil + } + + // The job has changed, so add to watcher. watcher.Add(watch.Item{Table: "jobs"}) watcher.Add(watch.Item{Job: job.ID}) - // If forceStatus is set, immediately set the job's status - if forceStatus != "" { - job.Status = forceStatus - return nil + // Copy and update the existing job + updated := job.Copy() + updated.Status = newStatus + updated.ModifyIndex = index + + // Insert the job + if err := txn.Insert("jobs", updated); err != nil { + return fmt.Errorf("job insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) } + return nil +} +func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) { allocs, err := txn.Get("allocs", "job", job.ID) if err != nil { - return err + return "", err } // If there is a non-terminal allocation, the job is running. @@ -1006,42 +1038,36 @@ func (s *StateStore) setJobStatus(watcher watch.Items, txn *memdb.Txn, for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { hasAlloc = true if !alloc.(*structs.Allocation).TerminalStatus() { - job.Status = structs.JobStatusRunning - return nil + return structs.JobStatusRunning, nil } } evals, err := txn.Get("evals", "job", job.ID) if err != nil { - return err + return "", err } hasEval := false for eval := evals.Next(); eval != nil; eval = evals.Next() { hasEval = true if !eval.(*structs.Evaluation).TerminalStatus() { - job.Status = structs.JobStatusPending - return nil + return structs.JobStatusPending, nil } } // The job is dead if all the allocations and evals are terminal or if there // are no evals because of garbage collection. if evalDelete || hasEval || hasAlloc { - job.Status = structs.JobStatusDead - return nil + return structs.JobStatusDead, nil } // If there are no allocations or evaluations it is a new job. If the job is // periodic, we mark it as running as it will never have an // allocation/evaluation against it. if job.IsPeriodic() { - job.Status = structs.JobStatusRunning - } else { - job.Status = structs.JobStatusPending + return structs.JobStatusRunning, nil } - - return nil + return structs.JobStatusPending, nil } // StateSnapshot is used to provide a point-in-time snapshot diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1aec05395ef..ff7b690f8a4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1749,80 +1749,147 @@ func TestStateStore_RestoreAlloc(t *testing.T) { } func TestStateStore_SetJobStatus_ForceStatus(t *testing.T) { - // Create a mock job. + state := testStateStore(t) + watcher := watch.NewItems() + txn := state.db.Txn(true) + + // Create and insert a mock job. job := mock.Job() job.Status = "" + job.ModifyIndex = 0 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } - state := testStateStore(t) - watcher := watch.NewItems() - txn := state.db.Txn(false) exp := "foobar" - if err := state.setJobStatus(watcher, txn, job, false, exp); err != nil { + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, exp); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } - if job.Status != exp { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, exp) + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) } -} + updated := i.(*structs.Job) -func TestStateStore_SetJobStatus_NoEvalsOrAllocs(t *testing.T) { - // Create a mock job. - job := mock.Job() - job.Status = "" + if updated.Status != exp { + t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, exp) + } + if updated.ModifyIndex != index { + t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index) + } +} + +func TestStateStore_SetJobStatus_NoOp(t *testing.T) { state := testStateStore(t) watcher := watch.NewItems() - txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + txn := state.db.Txn(true) + + // Create and insert a mock job that should be pending. + job := mock.Job() + job.Status = structs.JobStatusPending + job.ModifyIndex = 10 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } + + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusPending { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) } -} + updated := i.(*structs.Job) -func TestStateStore_SetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { - // Create a mock job. - job := mock.PeriodicJob() - job.Status = "" + if updated.ModifyIndex == index { + t.Fatalf("setJobStatus() should have been a no-op") + } +} +func TestStateStore_SetJobStatus(t *testing.T) { state := testStateStore(t) watcher := watch.NewItems() - txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { + txn := state.db.Txn(true) + + // Create and insert a mock job that should be pending but has an incorrect + // status. + job := mock.Job() + job.Status = "foobar" + job.ModifyIndex = 10 + if err := txn.Insert("jobs", job); err != nil { + t.Fatalf("job insert failed: %v", err) + } + + index := uint64(1000) + if err := state.setJobStatus(index, watcher, txn, job, false, ""); err != nil { t.Fatalf("setJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusRunning { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + i, err := txn.First("jobs", "id", job.ID) + if err != nil { + t.Fatalf("job lookup failed: %v", err) + } + updated := i.(*structs.Job) + + if updated.Status != structs.JobStatusPending { + t.Fatalf("setJobStatus() set %v; expected %v", updated.Status, structs.JobStatusPending) + } + + if updated.ModifyIndex != index { + t.Fatalf("setJobStatus() set %d; expected %d", updated.ModifyIndex, index) } } -func TestStateStore_SetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { - // Create a mock job. +func TestStateStore_GetJobStatus_NoEvalsOrAllocs(t *testing.T) { job := mock.Job() - job.Status = "" + state := testStateStore(t) + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusPending { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) + } +} +func TestStateStore_GetJobStatus_NoEvalsOrAllocs_Periodic(t *testing.T) { + job := mock.PeriodicJob() state := testStateStore(t) - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusDead { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) } } -func TestStateStore_SetJobStatus_DeadEvalsAndAllocs(t *testing.T) { +func TestStateStore_GetJobStatus_NoEvalsOrAllocs_EvalDelete(t *testing.T) { + job := mock.Job() state := testStateStore(t) + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } - // Create a mock job. + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + +func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { + state := testStateStore(t) job := mock.Job() - job.Status = "" // Create a mock alloc that is dead. alloc := mock.Alloc() @@ -1840,23 +1907,20 @@ func TestStateStore_SetJobStatus_DeadEvalsAndAllocs(t *testing.T) { t.Fatalf("err: %v", err) } - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, false, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusDead { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusDead) + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) } } -func TestStateStore_SetJobStatus_RunningAlloc(t *testing.T) { +func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { state := testStateStore(t) - - // Create a mock job. job := mock.Job() - job.Status = "" // Create a mock alloc that is running. alloc := mock.Alloc() @@ -1866,23 +1930,20 @@ func TestStateStore_SetJobStatus_RunningAlloc(t *testing.T) { t.Fatalf("err: %v", err) } - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusRunning { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusRunning) + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) } } func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { state := testStateStore(t) - - // Create a mock job. job := mock.Job() - job.Status = "" // Create a mock eval that is pending. eval := mock.Eval() @@ -1892,14 +1953,14 @@ func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { t.Fatalf("err: %v", err) } - watcher := watch.NewItems() txn := state.db.Txn(false) - if err := state.setJobStatus(watcher, txn, job, true, ""); err != nil { - t.Fatalf("setJobStatus() failed: %v", err) + status, err := state.getJobStatus(txn, job, true) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) } - if job.Status != structs.JobStatusPending { - t.Fatalf("setJobStatus() set %v; expected %v", job.Status, structs.JobStatusPending) + if status != structs.JobStatusPending { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusPending) } } From 47fbfd3000787b22dcfa58ebc61472c47251a826 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 12 Jan 2016 09:50:33 -0800 Subject: [PATCH 4/4] Add JobModifyIndex --- nomad/job_endpoint_test.go | 1 + nomad/mock/mock.go | 7 ++++--- nomad/state/state_store.go | 2 ++ nomad/structs/structs.go | 5 +++-- scheduler/util.go | 5 +---- scheduler/util_test.go | 4 ++-- 6 files changed, 13 insertions(+), 11 deletions(-) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 236028c4b9a..7c313e8d01a 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -532,6 +532,7 @@ func TestJobEndpoint_GetJob(t *testing.T) { } job.CreateIndex = resp.JobModifyIndex job.ModifyIndex = resp.JobModifyIndex + job.JobModifyIndex = resp.JobModifyIndex // Lookup the job get := &structs.JobSpecificRequest{ diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 3408e0f972e..38b4a49af58 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -124,9 +124,10 @@ func Job() *structs.Job { Meta: map[string]string{ "owner": "armon", }, - Status: structs.JobStatusPending, - CreateIndex: 42, - ModifyIndex: 99, + Status: structs.JobStatusPending, + CreateIndex: 42, + ModifyIndex: 99, + JobModifyIndex: 99, } job.InitFields() return job diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 338fab7fea5..553bcbf0777 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -295,6 +295,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { if existing != nil { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index + job.JobModifyIndex = index // Compute the job status var err error @@ -305,6 +306,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } else { job.CreateIndex = index job.ModifyIndex = index + job.JobModifyIndex = index // If we are inserting the job for the first time, we don't need to // calculate the jobs status as it is known. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index bbb171f1de9..68b7e6e8d42 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -777,8 +777,9 @@ type Job struct { StatusDescription string // Raft Indexes - CreateIndex uint64 - ModifyIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 + JobModifyIndex uint64 } // InitFields is used to initialize fields in the Job. This should be called diff --git a/scheduler/util.go b/scheduler/util.go index f448687c5ec..29fed93427f 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -92,10 +92,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, } // If the definition is updated we need to update - // XXX: This is an extremely conservative approach. We can check - // if the job definition has changed in a way that affects - // this allocation and potentially ignore it. - if job.ModifyIndex != exist.Job.ModifyIndex { + if job.JobModifyIndex != exist.Job.JobModifyIndex { result.update = append(result.update, allocTuple{ Name: name, TaskGroup: tg, diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 7b213da77b5..3ee813090f8 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -38,7 +38,7 @@ func TestDiffAllocs(t *testing.T) { // The "old" job has a previous modify index oldJob := new(structs.Job) *oldJob = *job - oldJob.ModifyIndex -= 1 + oldJob.JobModifyIndex -= 1 tainted := map[string]bool{ "dead": true, @@ -119,7 +119,7 @@ func TestDiffSystemAllocs(t *testing.T) { // The "old" job has a previous modify index oldJob := new(structs.Job) *oldJob = *job - oldJob.ModifyIndex -= 1 + oldJob.JobModifyIndex -= 1 tainted := map[string]bool{ "dead": true,