Skip to content

Commit

Permalink
Adding PlatformFilter for scheduling
Browse files Browse the repository at this point in the history
Signed-off-by: Nishant Totla <nishanttotla@gmail.com>
  • Loading branch information
nishanttotla committed May 4, 2017
1 parent 78f240e commit 8edbb92
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 0 deletions.
43 changes: 43 additions & 0 deletions manager/scheduler/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,46 @@ func (f *ConstraintFilter) Explain(nodes int) string {
}
return fmt.Sprintf("scheduling constraints not satisfied on %d nodes", nodes)
}

// PlatformFilter selects only nodes that run the required platform.
type PlatformFilter struct {
supportedPlatforms []*api.Platform
}

// SetTask returns true when the filter is enabled for a given task.
func (f *PlatformFilter) SetTask(t *api.Task) bool {
placement := t.Spec.Placement
if placement != nil {
// copy the platform information
f.supportedPlatforms = placement.Platforms
if len(placement.Platforms) > 0 {
return true
}
}
return false
}

// Check returns true if the task can be scheduled into the given node.
func (f *PlatformFilter) Check(n *NodeInfo) bool {
// if the supportedPlatforms field is empty, then either it wasn't
// provided or there are no constraints
if len(f.supportedPlatforms) == 0 {
return true
}
// check if the platform for the node is supported
nodePlatform := n.Description.Platform
for _, p := range f.supportedPlatforms {
if (p.Architecture == "" || p.Architecture == nodePlatform.Architecture) && (p.OS == "" || p.OS == nodePlatform.OS) {
return true
}
}
return false
}

// Explain returns an explanation of a failure.
func (f *PlatformFilter) Explain(nodes int) string {
if nodes == 1 {
return "unsupported platform on 1 node"

This comment has been minimized.

Copy link
@erikbgithub

erikbgithub Jul 23, 2017

This is not explaining anything. Which node is unsupported? Which platforms are supported? What did you get as value for this node?

Btw how can you get apparently two working nodes and one non-working node with 3 raspberries? Shouldn't they either all be unsupported or all supported?

}
return fmt.Sprintf("unsupported platform on %d nodes", nodes)
}
1 change: 1 addition & 0 deletions manager/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
&ResourceFilter{},
&PluginFilter{},
&ConstraintFilter{},
&PlatformFilter{},
}
)

Expand Down
207 changes: 207 additions & 0 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,213 @@ func TestSchedulerPreexistingDeadTask(t *testing.T) {
assert.Equal(t, "id1", assignment.NodeID)
}

func TestSchedulerCompatiblePlatform(t *testing.T) {
ctx := context.Background()
// create tasks
// task1 - has a node it can run on
task1 := &api.Task{
ID: "id1",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "linux",
},
},
},
},
}

// task2 - has no node it can run on
task2 := &api.Task{
ID: "id2",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "intel",
OS: "linux",
},
},
},
},
}

// task3 - no platform constraints, should run on any node
task3 := &api.Task{
ID: "id3",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name3",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}

// task4 - only OS constraint, is runnable on any linux node
task4 := &api.Task{
ID: "id4",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "",
OS: "linux",
},
},
},
},
}

// task5 - supported on multiple platforms
task5 := &api.Task{
ID: "id5",
DesiredState: api.TaskStateRunning,
ServiceAnnotations: api.Annotations{
Name: "name5",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Spec: api.TaskSpec{
Placement: &api.Placement{
Platforms: []*api.Platform{
{
Architecture: "amd64",
OS: "linux",
},
{
Architecture: "amd64",
OS: "windows",
},
},
},
},
}

node1 := &api.Node{
ID: "node1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "amd64",
OS: "linux",
},
},
}

node2 := &api.Node{
ID: "node2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Description: &api.NodeDescription{
Platform: &api.Platform{
Architecture: "amd64",
OS: "windows",
},
},
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial task and nodes to the store
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()

// task1 should get assigned
assignment1 := watchAssignment(t, watch)
assert.Equal(t, "node1", assignment1.NodeID)

// add task2
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (unsupported platform on 2 nodes)", failure.Status.Message)

// add task3
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
assert.NoError(t, err)
assignment2 := watchAssignment(t, watch)
assert.Regexp(t, assignment2.NodeID, "(node1|node2)")

// add task4
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task4))
return nil
})
assert.NoError(t, err)
assignment3 := watchAssignment(t, watch)
assert.Equal(t, "node1", assignment3.NodeID)

// add task5
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task5))
return nil
})
assert.NoError(t, err)
assignment4 := watchAssignment(t, watch)
assert.Regexp(t, assignment4.NodeID, "(node1|node2)")
}

func TestPreassignedTasks(t *testing.T) {
ctx := context.Background()
initialNodeSet := []*api.Node{
Expand Down

0 comments on commit 8edbb92

Please sign in to comment.