Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for maximum replicas per node #2758

Merged
merged 1 commit into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/api.pb.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3555,6 +3555,13 @@ file {
type_name: ".docker.swarmkit.v1.Platform"
json_name: "platforms"
}
field {
name: "max_replicas"
number: 4
label: LABEL_OPTIONAL
type: TYPE_UINT64
json_name: "maxReplicas"
}
}
message_type {
name: "JoinTokens"
Expand Down
665 changes: 348 additions & 317 deletions api/types.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,9 @@ message Placement {
// This field is used in the platform filter for scheduling. If empty,
// then the platform filter is off, meaning there are no scheduling restrictions.
repeated Platform platforms = 3;

// MaxReplicas specifies the limit for maximum number of replicas running on one node.
uint64 max_replicas = 4;
}

// JoinToken contains the join tokens for workers and managers.
Expand Down
1 change: 1 addition & 0 deletions cmd/swarmctl/service/flagparser/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func AddServiceFlags(flags *pflag.FlagSet) {
flags.StringSlice("label", nil, "service label (key=value)")

flags.Uint64("replicas", 1, "number of replicas for the service (only works in replicated service mode)")
flags.Uint64("replicas-max-per-node", 0, "maximum number of replicas for per node (only works in replicated service mode) (default 0 = unlimited)")

flags.String("image", "", "container image")
flags.String("hostname", "", "container hostname")
Expand Down
16 changes: 16 additions & 0 deletions cmd/swarmctl/service/flagparser/placement.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package flagparser

import (
"fmt"

"github.com/docker/swarmkit/api"
"github.com/spf13/pflag"
)
Expand All @@ -17,5 +19,19 @@ func parsePlacement(flags *pflag.FlagSet, spec *api.ServiceSpec) error {
spec.Task.Placement.Constraints = constraints
}

if flags.Changed("replicas-max-per-node") {
if spec.GetReplicated() == nil {
return fmt.Errorf("--replicas-max-per-node can only be specified in --mode replicated")
}
maxReplicas, err := flags.GetUint64("replicas-max-per-node")
if err != nil {
return err
}
if spec.Task.Placement == nil {
spec.Task.Placement = &api.Placement{}
}
spec.Task.Placement.MaxReplicas = maxReplicas
}

return nil
}
25 changes: 25 additions & 0 deletions manager/scheduler/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,28 @@ func (f *HostPortFilter) Explain(nodes int) string {
}
return fmt.Sprintf("host-mode port already in use on %d nodes", nodes)
}

// MaxReplicasFilter selects only nodes that does not exceed max replicas per node.
type MaxReplicasFilter struct {
t *api.Task
}

// SetTask returns true when max replicas per node filter > 0 for a given task.
func (f *MaxReplicasFilter) SetTask(t *api.Task) bool {
if t.Spec.Placement != nil && t.Spec.Placement.MaxReplicas > 0 {
f.t = t
return true
}

return false
}

// Check returns true if there is less active (assigned or pre-assigned) tasks for this service on current node than set to MaxReplicas limit
func (f *MaxReplicasFilter) Check(n *NodeInfo) bool {
return uint64(n.ActiveTasksCountByService[f.t.ServiceID]) < f.t.Spec.Placement.MaxReplicas
}

// Explain returns an explanation of a failure.
func (f *MaxReplicasFilter) Explain(nodes int) string {
return "max replicas per node limit exceed"
}
1 change: 1 addition & 0 deletions manager/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
&ConstraintFilter{},
&PlatformFilter{},
&HostPortFilter{},
&MaxReplicasFilter{},
}
)

Expand Down
253 changes: 253 additions & 0 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3262,3 +3262,256 @@ func TestSchedulerHostPort(t *testing.T) {
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (host-mode port already in use on 2 nodes)", failure.Status.Err)
}

func TestSchedulerMaxReplicas(t *testing.T) {
ctx := context.Background()
node1 := &api.Node{
ID: "nodeid1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
node2 := &api.Node{
ID: "nodeid2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
olljanat marked this conversation as resolved.
Show resolved Hide resolved
},
},
ServiceAnnotations: api.Annotations{
Name: "name3",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()

// Tasks shouldn't be scheduled because there are no nodes.
watchAssignmentFailure(t, watch)
watchAssignmentFailure(t, watch)

err = s.Update(func(tx store.Tx) error {
// Add initial node and task
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
return nil
})
assert.NoError(t, err)

// Tasks 1 and 2 should be assigned to different nodes.
assignment1 := watchAssignment(t, watch)
assignment2 := watchAssignment(t, watch)
assert.True(t, assignment1 != assignment2)

// Task 3 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
assert.NoError(t, err)

failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (max replicas per node limit exceed)", failure.Status.Err)

// Add third node to get task 3 scheduled
node3 := &api.Node{
ID: "nodeid3",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node3",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, node3))
return nil
})
assert.NoError(t, err)

// Create four more tasks to node 1
task4 := &api.Task{
ID: "id4",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task5 := &api.Task{
ID: "id5",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name5",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task6 := &api.Task{
ID: "id6",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name6",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task7 := &api.Task{
ID: "id7",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name7",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task4))
assert.NoError(t, store.CreateTask(tx, task5))
assert.NoError(t, store.CreateTask(tx, task6))
return nil
})
assert.NoError(t, err)

// Task 7 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task7))
return nil
})
assert.NoError(t, err)

failure = watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (scheduling constraints not satisfied on 3 nodes)", failure.Status.Err)
}