Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Pod's ownership to inject metrics collector #1303

Merged
merged 6 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/webhook/v1beta1/pod/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
const (
MasterRole = "master"
BatchJob = "Job"
// TrialKind is the name of Trial kind
TrialKind = "Trial"
// TrialAPIVersion is the name of Trial API Version
TrialAPIVersion = "kubeflow.org/v1beta1"
)

var (
Expand Down
206 changes: 64 additions & 142 deletions pkg/webhook/v1beta1/pod/inject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package pod

import (
"context"
"fmt"
"errors"
"net/http"
"path/filepath"
"strings"

"github.com/spf13/viper"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -35,10 +35,8 @@ import (

common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
katibmanagerv1beta1 "github.com/kubeflow/katib/pkg/common/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1"
mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common"
"github.com/kubeflow/katib/pkg/controller.v1beta1/util"
"github.com/kubeflow/katib/pkg/util/v1beta1/katibconfig"
)

Expand Down Expand Up @@ -108,7 +106,13 @@ func NewSidecarInjector(c client.Client) *sidecarInjector {
}

func (s *sidecarInjector) MutationRequired(pod *v1.Pod, ns string) (bool, error) {
jobKind, jobName, err := getKatibJob(pod)
object, err := util.ConvertObjectToUnstructured(pod)
if err != nil {
return false, err
}

// Try to get Katib job kind and job name from mutating pod
jobKind, jobName, err := s.getKatibJob(object, ns)
if err != nil {
return false, nil
}
Expand Down Expand Up @@ -141,9 +145,17 @@ func (s *sidecarInjector) MutationRequired(pod *v1.Pod, ns string) (bool, error)
func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) {
mutatedPod := pod.DeepCopy()

kind, trialName, _ := getKatibJob(pod)
object, err := util.ConvertObjectToUnstructured(pod)
if err != nil {
return nil, err
}

// Try to get Katib job kind and job name from mutating pod
jobKind, jobName, _ := s.getKatibJob(object, namespace)

trial := &trialsv1beta1.Trial{}
if err := s.client.Get(context.TODO(), apitypes.NamespacedName{Name: trialName, Namespace: namespace}, trial); err != nil {
// jobName and Trial name is equal
if err := s.client.Get(context.TODO(), apitypes.NamespacedName{Name: jobName, Namespace: namespace}, trial); err != nil {
return nil, err
}

Expand All @@ -157,16 +169,16 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error)

mountPath, pathKind := getMountPath(trial.Spec.MetricsCollector)
if mountPath != "" {
if err = mutateVolume(mutatedPod, kind, mountPath, injectContainer.Name, pathKind); err != nil {
if err = mutateVolume(mutatedPod, jobKind, mountPath, injectContainer.Name, pathKind); err != nil {
return nil, err
}
}
if needWrapWorkerContainer(trial.Spec.MetricsCollector) {
if err = wrapWorkerContainer(mutatedPod, namespace, kind, mountPath, pathKind, trial.Spec.MetricsCollector); err != nil {
if err = wrapWorkerContainer(mutatedPod, namespace, jobKind, mountPath, pathKind, trial.Spec.MetricsCollector); err != nil {
return nil, err
}
}
log.Info("Inject metrics collector sidecar container", "Pod Generate Name", mutatedPod.GenerateName, "Trial", trialName)
log.Info("Inject metrics collector sidecar container", "Pod Generate Name", mutatedPod.GenerateName, "Trial", jobName)
return mutatedPod, nil
}

Expand Down Expand Up @@ -206,143 +218,53 @@ func (s *sidecarInjector) getMetricsCollectorContainer(trial *trialsv1beta1.Tria
return &injectContainer, nil
}

func getMetricsCollectorArgs(trialName, metricName string, mc common.MetricsCollectorSpec) []string {
args := []string{"-t", trialName, "-m", metricName, "-s", katibmanagerv1beta1.GetDBManagerAddr()}
if mountPath, _ := getMountPath(mc); mountPath != "" {
args = append(args, "-path", mountPath)
}
if mc.Source != nil && mc.Source.Filter != nil && len(mc.Source.Filter.MetricsFormat) > 0 {
args = append(args, "-f", strings.Join(mc.Source.Filter.MetricsFormat, ";"))
}
return args
}

func getMountPath(mc common.MetricsCollectorSpec) (string, common.FileSystemKind) {
if mc.Collector.Kind == common.StdOutCollector {
return common.DefaultFilePath, common.FileKind
} else if mc.Collector.Kind == common.FileCollector {
return mc.Source.FileSystemPath.Path, common.FileKind
} else if mc.Collector.Kind == common.TfEventCollector {
return mc.Source.FileSystemPath.Path, common.DirectoryKind
} else if mc.Collector.Kind == common.CustomCollector {
if mc.Source == nil || mc.Source.FileSystemPath == nil {
return "", common.InvalidKind
}
return mc.Source.FileSystemPath.Path, mc.Source.FileSystemPath.Kind
} else {
return "", common.InvalidKind
}
}

func needWrapWorkerContainer(mc common.MetricsCollectorSpec) bool {
mcKind := mc.Collector.Kind
for _, kind := range NeedWrapWorkerMetricsCollecterList {
if mcKind == kind {
return true
}
}
return false
}

func wrapWorkerContainer(
pod *v1.Pod, namespace, jobKind, metricsFile string,
pathKind common.FileSystemKind,
mc common.MetricsCollectorSpec) error {
index := -1
for i, c := range pod.Spec.Containers {
jobProvider, err := jobv1beta1.New(jobKind)
if err != nil {
return err
}
if jobProvider.IsTrainingContainer(i, c) {
index = i
break
func (s *sidecarInjector) getKatibJob(object *unstructured.Unstructured, namespace string) (string, string, error) {
owners := object.GetOwnerReferences()
// jobKind and jobName points to the object kind and name that Trial is created
jobKind := ""
jobName := ""
// Search for Trial owner in object owner references
// Trial is owned object if kind = Trial kind and API version = Trial API version
for _, owner := range owners {
if owner.Kind == TrialKind && owner.APIVersion == TrialAPIVersion {
jobKind = object.GetKind()
jobName = object.GetName()
}
}
if index >= 0 {
command := []string{"sh", "-c"}
args, err := getContainerCommand(pod, namespace, index)
if err != nil {
return err
}
// If the first two commands are sh -c, we do not inject command.
if args[0] == "sh" || args[0] == "bash" {
if args[1] == "-c" {
command = args[0:2]
args = args[2:]
// If Trial is not found in object owners search for nested owners
if jobKind == "" {
i := 0
// Search for Trial ownership unless jobKind is empty and owners is exists
for jobKind == "" && i < len(owners) {
nestedJob := &unstructured.Unstructured{}
// Get group and version from owner API version
gv, err := schema.ParseGroupVersion(owners[i].APIVersion)
if err != nil {
return "", "", err
}
}
if mc.Collector.Kind == common.StdOutCollector {
redirectStr := fmt.Sprintf("1>%s 2>&1", metricsFile)
args = append(args, redirectStr)
}
args = append(args, "&&", getMarkCompletedCommand(metricsFile, pathKind))
argsStr := strings.Join(args, " ")
c := &pod.Spec.Containers[index]
c.Command = command
c.Args = []string{argsStr}
}
return nil
}

func getMarkCompletedCommand(mountPath string, pathKind common.FileSystemKind) string {
dir := mountPath
if pathKind == common.FileKind {
dir = filepath.Dir(mountPath)
}
// $$ is process id in shell
pidFile := filepath.Join(dir, "$$$$.pid")
return fmt.Sprintf("echo %s > %s", mccommon.TrainingCompleted, pidFile)
}

func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName string, pathKind common.FileSystemKind) error {
metricsVol := v1.Volume{
Name: common.MetricsVolume,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
}
dir := mountPath
if pathKind == common.FileKind {
dir = filepath.Dir(mountPath)
}
vm := v1.VolumeMount{
Name: metricsVol.Name,
MountPath: dir,
}
indexList := []int{}
for i, c := range pod.Spec.Containers {
shouldMount := false
if c.Name == sidecarContainerName {
shouldMount = true
} else {
jobProvider, err := jobv1beta1.New(jobKind)
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: owners[i].Kind,
}
// Set GVK for nested unstructured object
nestedJob.SetGroupVersionKind(gvk)
// Get nested object from cluster.
// Nested object namespace must be equal to object namespace
err = s.client.Get(context.TODO(), apitypes.NamespacedName{Name: owners[i].Name, Namespace: namespace}, nestedJob)
if err != nil {
return err
return "", "", err
}
shouldMount = jobProvider.IsTrainingContainer(i, c)
}
if shouldMount {
indexList = append(indexList, i)
}
}
for _, i := range indexList {
c := &pod.Spec.Containers[i]
if c.VolumeMounts == nil {
c.VolumeMounts = make([]v1.VolumeMount, 0)
// Recursively search for Trial ownership in nested object
jobKind, jobName, err = s.getKatibJob(nestedJob, namespace)
i++
}
c.VolumeMounts = append(c.VolumeMounts, vm)
pod.Spec.Containers[i] = *c
}
pod.Spec.Volumes = append(pod.Spec.Volumes, metricsVol)

return nil
}

func getSidecarContainerName(cKind common.CollectorKind) string {
if cKind == common.StdOutCollector || cKind == common.FileCollector {
return mccommon.MetricLoggerCollectorContainerName
} else {
return mccommon.MetricCollectorContainerName
// If jobKind is empty after the loop, Trial doesn't own the object
if jobKind == "" {
return "", "", errors.New("The Pod doesn't belong to Katib Job")
}

return jobKind, jobName, nil
}
Loading