Skip to content

Commit

Permalink
Added hosts into environment.
Browse files Browse the repository at this point in the history
Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Dec 16, 2019
1 parent 2f688bd commit bf6602b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 13 deletions.
4 changes: 4 additions & 0 deletions pkg/controllers/job/plugins/svc/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package svc
const (
// ConfigMapTaskHostFmt key in config map
ConfigMapTaskHostFmt = "%s.host"
// EnvTaskHostFmt is the key for host list in environment
EnvTaskHostFmt = "VC_%s_HOSTS"
// EnvHostNumFmt is the key for host number in environment
EnvHostNumFmt = "VC_%s_NUM"

// ConfigMapMountPath mount path
ConfigMapMountPath = "/etc/volcano"
Expand Down
50 changes: 44 additions & 6 deletions pkg/controllers/job/plugins/svc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package svc
import (
"flag"
"fmt"
"strconv"
"strings"

"k8s.io/klog"
Expand Down Expand Up @@ -88,6 +89,34 @@ func (sp *servicePlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
pod.Spec.Subdomain = job.Name
}

var hostEnv []v1.EnvVar
var envNames []string

for _, ts := range job.Spec.Tasks {
// TODO(k82cn): The splitter and the prefix of env should be configurable.
envNames = append(envNames, fmt.Sprintf(EnvTaskHostFmt, strings.ToUpper(ts.Name)))
envNames = append(envNames, fmt.Sprintf(EnvHostNumFmt, strings.ToUpper(ts.Name)))
}

for _, name := range envNames {
hostEnv = append(hostEnv, v1.EnvVar{
Name: name,
ValueFrom: &v1.EnvVarSource{
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
LocalObjectReference: v1.LocalObjectReference{Name: sp.cmName(job)},
Key: name,
}}},
)
}

for i := range pod.Spec.Containers {
pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, hostEnv...)
}

for i := range pod.Spec.InitContainers {
pod.Spec.InitContainers[i].Env = append(pod.Spec.InitContainers[i].Env, hostEnv...)
}

sp.mountConfigmap(pod, job)

return nil
Expand All @@ -98,9 +127,10 @@ func (sp *servicePlugin) OnJobAdd(job *batch.Job) error {
return nil
}

data := generateHost(job)
hostFile := generateHosts(job)

if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, data, sp.cmName(job)); err != nil {
// Create ConfigMap of hosts for Pods to mount.
if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, hostFile, sp.cmName(job)); err != nil {
return err
}

Expand Down Expand Up @@ -258,8 +288,8 @@ func (sp *servicePlugin) cmName(job *batch.Job) string {
return fmt.Sprintf("%s-%s", job.Name, sp.Name())
}

func generateHost(job *batch.Job) map[string]string {
data := make(map[string]string, len(job.Spec.Tasks))
func generateHosts(job *batch.Job) map[string]string {
hostFile := make(map[string]string, len(job.Spec.Tasks))

for _, ts := range job.Spec.Tasks {
hosts := make([]string, 0, ts.Replicas)
Expand All @@ -280,8 +310,16 @@ func generateHost(job *batch.Job) map[string]string {
}

key := fmt.Sprintf(ConfigMapTaskHostFmt, ts.Name)
data[key] = strings.Join(hosts, "\n")
hostFile[key] = strings.Join(hosts, "\n")

// TODO(k82cn): The splitter and the prefix of env should be configurable.
// export hosts as environment
key = fmt.Sprintf(EnvTaskHostFmt, strings.ToUpper(ts.Name))
hostFile[key] = strings.Join(hosts, ",")
// export host number as environment.
key = fmt.Sprintf(EnvHostNumFmt, strings.ToUpper(ts.Name))
hostFile[key] = strconv.Itoa(len(hosts))
}

return data
return hostFile
}
33 changes: 26 additions & 7 deletions test/e2e/job_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ package e2e

import (
"fmt"
"strings"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

cv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/api"

"volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/plugins/env"
"volcano.sh/volcano/pkg/controllers/job/plugins/svc"
)

var _ = Describe("Job E2E Test: Test Job Plugins", func() {
Expand Down Expand Up @@ -222,16 +227,30 @@ var _ = Describe("Job E2E Test: Test Job Plugins", func() {
}
Expect(foundVolume).To(BeTrue())

// Check whether env exists in the pod
for _, container := range pod.Spec.Containers {
for _, envi := range container.Env {
if envi.Name == env.TaskVkIndex {
foundEnv = true
break
// Check whether env exists in the containers and initContainers
containers := pod.Spec.Containers
containers = append(containers, pod.Spec.InitContainers...)
envNames := []string{
env.TaskVkIndex,
env.TaskIndex,
fmt.Sprintf(svc.EnvTaskHostFmt, strings.ToUpper(taskName)),
fmt.Sprintf(svc.EnvHostNumFmt, strings.ToUpper(taskName)),
}

for _, container := range containers {
for _, name := range envNames {
foundEnv = false
for _, envi := range container.Env {
if envi.Name == name {
foundEnv = true
break
}
}

Expect(foundEnv).To(BeTrue(),
fmt.Sprint("container: %s, env name: %s", container.Name, name))
}
}
Expect(foundEnv).To(BeTrue())

// Check whether service is created with job name
_, err = context.kubeclient.CoreV1().Services(job.Namespace).Get(job.Name, v1.GetOptions{})
Expand Down

0 comments on commit bf6602b

Please sign in to comment.