Skip to content

Commit

Permalink
Merge pull request #6996 from hashicorp/system-sched-ineligible-updates
Browse files Browse the repository at this point in the history
System sched ignore ineligible updates
  • Loading branch information
drewbailey authored Feb 3, 2020
2 parents e8136c0 + e86988c commit 0eb3586
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 49 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ BUG FIXES:
* drivers: Fixed a bug where exec, java, and raw_exec drivers collected and emited stats every second regardless of the telemetry config [[GH-7043](https://github.com/hashicorp/nomad/issues/7043)]
* server: Fixed a deadlock that may occur when server leadership flaps very quickly [[GH-6977](https://github.com/hashicorp/nomad/issues/6977)]
* scheduler: Fixed a bug that caused evicted allocs on a lost node to be stuck in running. [[GH-6902](https://github.com/hashicorp/nomad/issues/6902)]
* scheduler: Fixed a bug where `nomad job plan/apply` returned errors instead of a partial placement warning for ineligible nodes. [[GH-6968](https://github.com/hashicorp/nomad/issues/6968)]
* scheduler: Fixed a bug where `nomad job plan/apply` returned errors instead of ignoring system job updates for ineligible nodes. [[GH-6996](https://github.com/hashicorp/nomad/issues/6996)]

## 0.10.3 (January 29, 2020)

Expand Down
5 changes: 0 additions & 5 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
node, ok := nodeByID[missing.Alloc.NodeID]
if !ok {
s.logger.Debug("could not find node %q", missing.Alloc.NodeID)
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}

s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
continue
}

Expand Down
194 changes: 180 additions & 14 deletions scheduler/system_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,8 +1310,158 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) {

}

// This test ensures that the scheduler correctly ignores ineligible
// nodes when scheduling due to a new node being added. The job has two
// task groups contrained to a particular node class. The desired behavior
// should be that the TaskGroup constrained to the newly added node class is
// added and that the TaskGroup constrained to the ineligible node is ignored.
func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
h := NewHarness(t)

// Create two nodes
var node *structs.Node
node = mock.Node()
node.NodeClass = "Class-A"
node.ComputeClass()
require.Nil(t, h.State.UpsertNode(h.NextIndex(), node))

var nodeB *structs.Node
nodeB = mock.Node()
nodeB.NodeClass = "Class-B"
nodeB.ComputeClass()
require.Nil(t, h.State.UpsertNode(h.NextIndex(), nodeB))

// Make a job with two task groups, each constraint to a node class
job := mock.SystemJob()
tgA := job.TaskGroups[0]
tgA.Name = "groupA"
tgA.Constraints = []*structs.Constraint{
{
LTarget: "${node.class}",
RTarget: node.NodeClass,
Operand: "=",
},
}
tgB := job.TaskGroups[0].Copy()
tgB.Name = "groupB"
tgB.Constraints = []*structs.Constraint{
{
LTarget: "${node.class}",
RTarget: nodeB.NodeClass,
Operand: "=",
},
}

// Upsert Job
job.TaskGroups = []*structs.TaskGroup{tgA, tgB}
require.Nil(t, h.State.UpsertJob(h.NextIndex(), job))

// Evaluate the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))

require.Nil(t, h.Process(NewSystemScheduler, eval))
require.Equal(t, "complete", h.Evals[0].Status)

// QueuedAllocations is drained
val, ok := h.Evals[0].QueuedAllocations["groupA"]
require.True(t, ok)
require.Equal(t, 0, val)

val, ok = h.Evals[0].QueuedAllocations["groupB"]
require.True(t, ok)
require.Equal(t, 0, val)

// Single plan with two NodeAllocations
require.Len(t, h.Plans, 1)
require.Len(t, h.Plans[0].NodeAllocation, 2)

// Mark the node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible

// Evaluate the node update
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: node.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)

// Ensure no new plans
require.Equal(t, 1, len(h.Plans))

// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[0].NodeAllocation {
require.Len(t, allocs, 1)
require.Equal(t, eval.ID, allocs[0].EvalID)
}

// Add a new node Class-B
var nodeBTwo *structs.Node
nodeBTwo = mock.Node()
nodeBTwo.ComputeClass()
nodeBTwo.NodeClass = "Class-B"
require.Nil(t, h.State.UpsertNode(h.NextIndex(), nodeBTwo))

// Evaluate the new node
eval3 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: nodeBTwo.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

// Ensure New eval is complete
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval3}))
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)

// Ensure no failed TG allocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))

require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeAllocation, 1)
// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[1].NodeAllocation {
require.Len(t, allocs, 1)
require.Equal(t, eval3.ID, allocs[0].EvalID)
}

ws := memdb.NewWatchSet()

allocsNodeOne, err := h.State.AllocsByNode(ws, node.ID)
require.NoError(t, err)
require.Len(t, allocsNodeOne, 1)

allocsNodeTwo, err := h.State.AllocsByNode(ws, nodeB.ID)
require.NoError(t, err)
require.Len(t, allocsNodeTwo, 1)

allocsNodeThree, err := h.State.AllocsByNode(ws, nodeBTwo.ID)
require.NoError(t, err)
require.Len(t, allocsNodeThree, 1)
}

// No errors reported when no available nodes prevent placement
func TestSystemSched_NoNodes(t *testing.T) {
func TestSystemSched_ExistingAllocNoNodes(t *testing.T) {
h := NewHarness(t)

var node *structs.Node
Expand Down Expand Up @@ -1348,28 +1498,44 @@ func TestSystemSched_NoNodes(t *testing.T) {

// Mark the node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible
// Evaluate the job
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)

// Create a new job version, deploy
job2 := job.Copy()
job2.Meta["version"] = "2"
require.Nil(t, h.State.UpsertJob(h.NextIndex(), job2))

eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
// Run evaluation as a plan
eval3 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}

// Ensure New eval is complete
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval3}))
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)

// Ensure there is a FailedTGAlloc metric
require.Equal(t, 1, len(h.Evals[1].FailedTGAllocs))
// Ensure there are no FailedTGAllocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))
require.Equal(t, 0, h.Evals[2].QueuedAllocations[job2.Name])
}

// No errors reported when constraints prevent placement
Expand Down Expand Up @@ -1697,7 +1863,7 @@ func TestSystemSched_Preemption(t *testing.T) {
var nodes []*structs.Node
for i := 0; i < 2; i++ {
node := mock.Node()
//TODO(preetha): remove in 0.11
// TODO(preetha): remove in 0.11
node.Resources = &structs.Resources{
CPU: 3072,
MemoryMB: 5034,
Expand Down
63 changes: 36 additions & 27 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (d *diffResult) Append(other *diffResult) {
d.lost = append(d.lost, other.lost...)
}

// diffAllocs is used to do a set difference between the target allocations
// and the existing allocations. This returns 6 sets of results, the list of
// named task groups that need to be placed (no existing allocation), the
// diffSystemAllocsForNode is used to do a set difference between the target allocations
// and the existing allocations for a particular node. This returns 6 sets of results,
// the list of named task groups that need to be placed (no existing allocation), the
// allocations that need to be updated (job definition is newer), allocs that
// need to be migrated (node is draining), the allocs that need to be evicted
// (no longer required), those that should be ignored and those that are lost
Expand All @@ -67,7 +67,8 @@ func (d *diffResult) Append(other *diffResult) {
// required is a set of allocations that must exist.
// allocs is a list of non terminal allocations.
// terminalAllocs is an index of the latest terminal allocations by name.
func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
func diffSystemAllocsForNode(job *structs.Job, nodeID string,
eligibleNodes, taintedNodes map[string]*structs.Node,
required map[string]*structs.TaskGroup, allocs []*structs.Allocation,
terminalAllocs map[string]*structs.Allocation) *diffResult {
result := &diffResult{}
Expand Down Expand Up @@ -126,6 +127,12 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
continue
}

// For an existing allocation, if the nodeID is no longer
// eligible, the diff should be ignored
if _, ok := eligibleNodes[nodeID]; !ok {
goto IGNORE
}

// If the definition is updated we need to update
if job.JobModifyIndex != exist.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
Expand All @@ -152,19 +159,37 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,

// Require a placement if no existing allocation. If there
// is an existing allocation, we would have checked for a potential
// update or ignore above.
// update or ignore above. Ignore placements for tainted or
// ineligible nodes
if !ok {
result.place = append(result.place, allocTuple{
// Tainted and ineligible nodes for a non existing alloc
// should be filtered out and not count towards ignore or place
if _, tainted := taintedNodes[nodeID]; tainted {
continue
}
if _, eligible := eligibleNodes[nodeID]; !eligible {
continue
}

allocTuple := allocTuple{
Name: name,
TaskGroup: tg,
Alloc: terminalAllocs[name],
})
}

// If the new allocation isn't annotated with a previous allocation
// or if the previous allocation isn't from the same node then we
// annotate the allocTuple with a new Allocation
if allocTuple.Alloc == nil || allocTuple.Alloc.NodeID != nodeID {
allocTuple.Alloc = &structs.Allocation{NodeID: nodeID}
}
result.place = append(result.place, allocTuple)
}
}
return result
}

// diffSystemAllocs is like diffAllocs however, the allocations in the
// diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the
// diffResult contain the specific nodeID they should be allocated on.
//
// job is the job whose allocs is going to be diff-ed.
Expand All @@ -183,36 +208,20 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[
nodeAllocs[alloc.NodeID] = nallocs
}

eligibleNodes := make(map[string]*structs.Node)
for _, node := range nodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
eligibleNodes[node.ID] = node
}

// Create the required task groups.
required := materializeTaskGroups(job)

result := &diffResult{}
for nodeID, allocs := range nodeAllocs {
diff := diffAllocs(job, taintedNodes, required, allocs, terminalAllocs)

// If the node is tainted there should be no placements made
if _, ok := taintedNodes[nodeID]; ok {
diff.place = nil
} else {
// Mark the alloc as being for a specific node.
for i := range diff.place {
alloc := &diff.place[i]

// If the new allocation isn't annotated with a previous allocation
// or if the previous allocation isn't from the same node then we
// annotate the allocTuple with a new Allocation
if alloc.Alloc == nil || alloc.Alloc.NodeID != nodeID {
alloc.Alloc = &structs.Allocation{NodeID: nodeID}
}
}
}

diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminalAllocs)
result.Append(diff)
}

Expand Down
Loading

0 comments on commit 0eb3586

Please sign in to comment.