Skip to content

Commit

Permalink
Merge pull request volcano-sh#343 from k82cn/ka_340
Browse files Browse the repository at this point in the history
Added PodFitsHostPorts in kube-batchd.
  • Loading branch information
k82cn authored Sep 4, 2018
2 parents 14378d7 + a004e2f commit 7bc91f5
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 27 deletions.
1 change: 1 addition & 0 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pkg/scheduler/cache
pkg/scheduler/framework
pkg/scheduler/plugins/drf
pkg/scheduler/plugins/gang
pkg/scheduler/plugins/hostport
pkg/scheduler/plugins/nodeaffinity
pkg/scheduler/plugins/priority
pkg/scheduler/plugins/proportion
Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/drf"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/gang"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/hostport"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/nodeaffinity"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/priority"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/plugins/proportion"
Expand All @@ -32,10 +33,11 @@ import (

func init() {
// Plugins for Jobs
framework.RegisterPluginBuilder("priority", priority.New)
framework.RegisterPluginBuilder("gang", gang.New)
framework.RegisterPluginBuilder("drf", drf.New)
framework.RegisterPluginBuilder("gang", gang.New)
framework.RegisterPluginBuilder("hostport", hostport.New)
framework.RegisterPluginBuilder("nodeaffinity", nodeaffinity.New)
framework.RegisterPluginBuilder("priority", priority.New)

// Plugins for Queues
framework.RegisterPluginBuilder("proportion", proportion.New)
Expand Down
62 changes: 62 additions & 0 deletions pkg/scheduler/plugins/hostport/hostport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package hostport

import (
"fmt"

"github.com/golang/glog"

"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/cache"

"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/api"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/scheduler/framework"
)

type hostportPlugin struct {
args *framework.PluginArgs
}

func New(args *framework.PluginArgs) framework.Plugin {
return &hostportPlugin{
args: args,
}
}

func (pp *hostportPlugin) OnSessionOpen(ssn *framework.Session) {
ssn.AddPredicateFn(func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo := cache.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)
fit, _, err := predicates.PodFitsHostPorts(task.Pod, nil, nodeInfo)
if err != nil {
return err
}

glog.V(3).Infof("Predicates Task <%s/%s> on Node <%s>: fit %t, err %v",
task.Namespace, task.Name, node.Name, fit, err)

if !fit {
return fmt.Errorf("node <%s> didn't match task <%s/%s> node selector",
node.Name, task.Namespace, task.Name)
}

return nil
})
}

func (pp *hostportPlugin) OnSessionClose(ssn *framework.Session) {}
2 changes: 1 addition & 1 deletion pkg/scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

var defaultSchedulerConf = map[string]string{
"actions": "reclaim, allocate, preempt",
"plugins": "gang, priority, drf, nodeaffinity, proportion",
"plugins": "gang, priority, drf, nodeaffinity, proportion, hostport",
"plugin.gang.jobready": "true",
"plugin.gang.joborder": "true",
"plugin.gang.preemptable": "true",
Expand Down
27 changes: 18 additions & 9 deletions test/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,12 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
)

var _ = Describe("E2E Test", func() {
It("Schedule Job with SchedulerName", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := createQueueJobWithScheduler(context, "kube-batchd", "qj-1", 2, rep, "busybox", oneCPU, nil)
err := waitJobReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())
})

It("Schedule Job", func() {
context := initTestContext()
defer cleanupTestContext(context)
Expand Down Expand Up @@ -196,4 +188,21 @@ var _ = Describe("E2E Test", func() {
err = waitTasksReady(context, jobName1, expected)
Expect(err).NotTo(HaveOccurred())
})

It("Hostport", func() {
context := initTestContext()
defer cleanupTestContext(context)

nn := clusterNodeNumber(context)

containers := createContainers("nginx", oneCPU, 28080)
job := createJobWithOptions(context, "kube-batchd", "qj-1", int32(nn), int32(nn*2), nil, containers)

err := waitTasksReady(context, job.Name, nn)
Expect(err).NotTo(HaveOccurred())

err = waitTasksNotReady(context, job.Name, nn)
Expect(err).NotTo(HaveOccurred())
})

})
80 changes: 65 additions & 15 deletions test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,38 @@ func createJob(
req v1.ResourceList,
affinity *v1.Affinity,
) *batchv1.Job {
return createQueueJobWithScheduler(context, "kube-batchd", name, min, rep, img, req, affinity)
containers := createContainers(img, req, 0)
return createJobWithOptions(context, "kube-batchd", name, min, rep, affinity, containers)
}

func createQueueJobWithScheduler(context *context,
func createContainers(img string, req v1.ResourceList, hostport int32) []v1.Container {
container := v1.Container{
Image: img,
Name: img,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
},
}

if hostport > 0 {
container.Ports = []v1.ContainerPort{
{
ContainerPort: hostport,
HostPort: hostport,
},
}
}

return []v1.Container{container}
}

func createJobWithOptions(context *context,
scheduler string,
name string,
min, rep int32,
img string,
req v1.ResourceList,
affinity *v1.Affinity,
containers []v1.Container,
) *batchv1.Job {
queueJobName := "queuejob.k8s.io"
jns, jn := splictJobName(context, name)
Expand All @@ -237,17 +259,8 @@ func createQueueJobWithScheduler(context *context,
Spec: v1.PodSpec{
SchedulerName: scheduler,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Image: img,
Name: jn,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
},
},
},
Affinity: affinity,
Containers: containers,
Affinity: affinity,
},
},
},
Expand Down Expand Up @@ -355,6 +368,32 @@ func taskReady(ctx *context, jobName string, taskNum int) wait.ConditionFunc {
}
}

func taskNotReady(ctx *context, jobName string, taskNum int) wait.ConditionFunc {
_, jn := splictJobName(ctx, jobName)

return func() (bool, error) {
queueJob, err := ctx.kubeclient.BatchV1().Jobs(ctx.namespace).Get(jn, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespace).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

notReadyTaskNum := 0
for _, pod := range pods.Items {
labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels)
if !labelSelector.Matches(labels.Set(pod.Labels)) ||
!metav1.IsControlledBy(&pod, queueJob) {
continue
}
if pod.Status.Phase == v1.PodPending {
notReadyTaskNum++
}
}

return taskNum <= notReadyTaskNum, nil
}
}

func waitJobReady(ctx *context, name string) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskReady(ctx, name, -1))
}
Expand All @@ -363,6 +402,10 @@ func waitTasksReady(ctx *context, name string, taskNum int) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskReady(ctx, name, taskNum))
}

func waitTasksNotReady(ctx *context, name string, taskNum int) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskNotReady(ctx, name, taskNum))
}

func jobNotReady(ctx *context, jobName string) wait.ConditionFunc {
return func() (bool, error) {
queueJob, err := ctx.kubeclient.BatchV1().Jobs(ctx.namespace).Get(jobName, metav1.GetOptions{})
Expand Down Expand Up @@ -468,6 +511,13 @@ func clusterSize(ctx *context, req v1.ResourceList) int32 {
return res
}

func clusterNodeNumber(ctx *context) int {
nodes, err := ctx.kubeclient.CoreV1().Nodes().List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

return len(nodes.Items)
}

func computeNode(ctx *context, req v1.ResourceList) (string, int32) {
nodes, err := ctx.kubeclient.CoreV1().Nodes().List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
Expand Down

0 comments on commit 7bc91f5

Please sign in to comment.