diff --git a/docs/functions.rst b/docs/functions.rst index 42f92e4a02..170bca6266 100644 --- a/docs/functions.rst +++ b/docs/functions.rst @@ -150,6 +150,73 @@ Example: - | echo "Example" +KubeTaskParallel +---------------- + +KubeTaskParallel spins up a new pod with two containers connected +via shared emptyDir volume. +It's similar to KubeTask, but allows using multiple images to move backup data. +"background" container is one responsible for generating data, while "output" container +should export it to destination. +The main difference between them is that phase outputs can only generated from the +"output" container outputs. + +.. csv-table:: + :header: "Argument", "Required", "Type", "Description" + :align: left + :widths: 5,5,5,15 + + `namespace`, No, `string`, namespace in which to execute (the pod will be created in controller's namespace if not specified) + `backgroundImage`, Yes, `string`, image to be used in "background" container + `backgroundCommand`, Yes, `[]string`, command list to execute in "background" container + `outputImage`, Yes, `string`, image to be used in "output" container + `outputCommand`, Yes, `[]string`, command list to execute in "output" container + `podOverride`, No, `map[string]interface{}`, specs to override default pod specs with + `podAnnotations`, No, `map[string]string`, custom annotations for the temporary pod that gets created + `podLabels`, No, `map[string]string`, custom labels for the temporary pod that gets created + `sharedVolumeMedium`, No, `string`, medium setting for shared volume. See https://kubernetes.io/docs/concepts/storage/volumes/#emptydir + `sharedVolumeSizeLimit`, No, `string`, sizeLimit setting for shared volume + `sharedVolumeDir`, No, `string`, directory to mount shared volume. Defaults to `/tmp` + +Example: + +.. code-block:: yaml + :linenos: + + - func: KubeTaskParallel + name: examplePhase + args: + namespace: "{{ .Deployment.Namespace }}" + podOverride: + containers: + - name: export + imagePullPolicy: IfNotPresent + podAnnotations: + annKey: annValue + podLabels: + labelKey: labelValue + sharedVolumeMedium: Memory + sharedVolumeSizeLimit: 1Gi + sharedVolumeDir: /tmp/ + backgroundImage: ubuntu + backgroundCommand: + - bash + - -c + - | + mkfifo /tmp/pipe-file + for i in {1..10} + do + echo $i + sleep 0.1 + done > /tmp/pipe-file + outputImage: ubuntu + outputCommand: + - bash + - -c + - | + while [ ! -e /tmp/pipe-file ]; do sleep 1; done + cat /tmp/pipe-file + ScaleWorkload ------------- diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 143b714270..d4e1eeb191 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -63,3 +63,5 @@ webhook Kopia kopia hostname +emptyDir +sizeLimit diff --git a/docs_new/functions.md b/docs_new/functions.md index 07016e3b66..89d466f1a6 100644 --- a/docs_new/functions.md +++ b/docs_new/functions.md @@ -129,6 +129,69 @@ Example: echo "Example" ``` +### KubeTaskParallel + +KubeTaskParallel spins up a new pod with two containers connected via shared emptyDir volume. +It's similar to KubeTask, but allows using multiple images to move backup data. +"background" container is one responsible for generating data, while "output" container +should export it to destination. +The main difference between them is that phase outputs can only generated from the +"output" container outputs. + + + | Argument | Required | Type | Description | + | ----------- | :------: | ----------------------- | ----------- | + | namespace | No | string | namespace in which to execute (the pod will be created in controller's namespace if not specified) | + | backgroundImage | Yes | string | image to be used in "background" container | + | backgroundCommand | Yes | []string | command list to execute in "background" container | + | outputImage | Yes | string | image to be used in "output" container | + | outputCommand | Yes | []string | command list to execute in "output" container | + | podOverride | No | map[string]interface{} | specs to override default pod specs with | + | podAnnotations | No | map[string]string | custom annotations for the temporary pod that gets created | + | podLabels | No | map[string]string | custom labels for the temporary pod that gets created | + | sharedVolumeMedium | No | string | medium setting for shared volume, see https://kubernetes.io/docs/concepts/storage/volumes/#emptydir | + | sharedVolumeSizeLimit | No | string | sizeLimit setting for shared volume | + | sharedVolumeDir | No | string | directory to mount shared volume, defaults to `/tmp` | + + +Example: + +``` yaml +- func: KubeTaskParallel + name: examplePhase + args: + namespace: "{{ .Deployment.Namespace }}" + podOverride: + containers: + - name: export + imagePullPolicy: IfNotPresent + podAnnotations: + annKey: annValue + podLabels: + labelKey: labelValue + sharedVolumeMedium: Memory + sharedVolumeSizeLimit: 1Gi + sharedVolumeDir: /tmp/ + backgroundImage: ubuntu + backgroundCommand: + - bash + - -c + - | + mkfifo /tmp/pipe-file + for i in {1..10} + do + echo $i + sleep 0.1 + done > /tmp/pipe-file + outputImage: ubuntu + outputCommand: + - bash + - -c + - | + while [ ! -e /tmp/pipe-file ]; do sleep 1; done + cat /tmp/pipe-file +``` + ### ScaleWorkload ScaleWorkload is used to scale up or scale down a Kubernetes workload. diff --git a/pkg/function/kube_task_parallel.go b/pkg/function/kube_task_parallel.go new file mode 100644 index 0000000000..8bea4f2183 --- /dev/null +++ b/pkg/function/kube_task_parallel.go @@ -0,0 +1,325 @@ +// +// Copyright 2019 The Kanister Authors. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package function + +import ( + "context" + "sort" + "time" + + "github.com/kanisterio/errkit" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + kanister "github.com/kanisterio/kanister/pkg" + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/consts" + "github.com/kanisterio/kanister/pkg/field" + "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/log" + "github.com/kanisterio/kanister/pkg/output" + "github.com/kanisterio/kanister/pkg/param" + "github.com/kanisterio/kanister/pkg/progress" + "github.com/kanisterio/kanister/pkg/utils" +) + +const ( + // KubeTaskParallelFuncName gives the function name + KubeTaskParallelFuncName = "KubeTaskParallel" + KubeTaskParallelNamespaceArg = "namespace" + KubeTaskParallelBackgroundImageArg = "backgroundImage" + KubeTaskParallelBackgroundCommandArg = "backgroundCommand" + KubeTaskParallelOutputImageArg = "outputImage" + KubeTaskParallelOutputCommandArg = "outputCommand" + KubeTaskParallelVolumeMediumArg = "sharedVolumeMedium" + KubeTaskParallelVolumeSizeLimitArg = "sharedVolumeSizeLimit" + KubeTaskParallelSharedDirArg = "sharedVolumeDir" + KubeTaskParallelPodOverrideArg = "podOverride" +) + +const ( + ktpBackgroundContainer = "background" + ktpOutputContainer = "output" + ktpSharedVolumeName = "shared" + ktpDefaultSharedDir = "/tmp/" +) + +func init() { + _ = kanister.Register(&kubeTaskParallelFunc{}) +} + +var _ kanister.Func = (*kubeTaskParallelFunc)(nil) + +type kubeTaskParallelFunc struct { + progressPercent string + namespace string + backgroundImage string + backgroundCommand []string + outputImage string + outputCommand []string + storageDir string + storageMedium corev1.StorageMedium + storageSizeLimit *resource.Quantity + podOverride crv1alpha1.JSONMap + labels map[string]string + annotations map[string]string +} + +func (*kubeTaskParallelFunc) Name() string { + return KubeTaskParallelFuncName +} + +func (ktpf *kubeTaskParallelFunc) run( + ctx context.Context, + cli kubernetes.Interface, +) (map[string]interface{}, error) { + podSpec := corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Volumes: []corev1.Volume{ + { + Name: ktpSharedVolumeName, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: ktpf.storageMedium, + SizeLimit: ktpf.storageSizeLimit, + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: ktpOutputContainer, + Image: ktpf.outputImage, + Command: ktpf.outputCommand, + VolumeMounts: []corev1.VolumeMount{ + { + Name: ktpSharedVolumeName, + MountPath: ktpf.storageDir, + }, + }, + }, + { + Name: ktpBackgroundContainer, + Image: ktpf.backgroundImage, + Command: ktpf.backgroundCommand, + VolumeMounts: []corev1.VolumeMount{ + { + Name: ktpSharedVolumeName, + MountPath: ktpf.storageDir, + }, + }, + }, + }, + } + + podSpec, err := kube.PatchDefaultPodSpecs(podSpec, ktpf.podOverride) + if err != nil { + return nil, errkit.Wrap(err, "Unable to apply podOverride", "podSpec", podSpec, "podOverride", ktpf.podOverride) + } + + // Put the output container the first + sort.Slice(podSpec.Containers, func(i, j int) bool { + return podSpec.Containers[i].Name == ktpOutputContainer + }) + + if ktpf.labels == nil { + ktpf.labels = make(map[string]string) + } + ktpf.labels[consts.LabelKeyCreatedBy] = consts.LabelValueKanister + + if ktpf.annotations == nil { + ktpf.annotations = make(map[string]string) + } + // FIXME: this doesn't work with pod controller currently so we have to reorder containers + ktpf.annotations[defaultContainerAnn] = ktpOutputContainer + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: jobPrefix, + Namespace: ktpf.namespace, + Labels: ktpf.labels, + Annotations: ktpf.annotations, + }, + Spec: podSpec, + } + + log.Info().Print("POD: ", field.M{"pod": pod}) + + pod, err = cli.CoreV1().Pods(ktpf.namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return nil, errkit.Wrap(err, "Failed to create pod") + } + pc, err := kube.NewPodControllerForExistingPod(cli, pod) + if err != nil { + return nil, err + } + + ctx = field.Context(ctx, consts.PodNameKey, pod.Name) + ctx = field.Context(ctx, consts.ContainerNameKey, pod.Spec.Containers[0].Name) + go func() { + <-ctx.Done() + err := pc.StopPod(context.Background(), kube.PodControllerInfiniteStopTime, int64(0)) + if err != nil { + log.WithError(err).Print("Failed to delete pod", field.M{"PodName": pod.Name}) + } + }() + + return getPodOutput(ctx, pc) +} + +// This function is similar to kubeTaskPodFunc +func getPodOutput(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName()) + } + ctx = field.Context(ctx, consts.LogKindKey, consts.LogKindDatapath) + // Fetch logs from the pod + r, err := pc.StreamPodLogs(ctx) + if err != nil { + return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") + } + out, err := output.LogAndParse(ctx, r) + if err != nil { + return nil, err + } + // Wait for pod completion + if err := pc.WaitForPodCompletion(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pc.PodName()) + } + return out, err +} + +func (ktpf *kubeTaskParallelFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { + // Set progress percent + ktpf.progressPercent = progress.StartedPercent + defer func() { ktpf.progressPercent = progress.CompletedPercent }() + + var storageSizeString string + var bpAnnotations, bpLabels map[string]string + var err error + if err = Arg(args, KubeTaskParallelBackgroundImageArg, &ktpf.backgroundImage); err != nil { + return nil, err + } + if err = Arg(args, KubeTaskParallelOutputImageArg, &ktpf.outputImage); err != nil { + return nil, err + } + if err = Arg(args, KubeTaskParallelBackgroundCommandArg, &ktpf.backgroundCommand); err != nil { + return nil, err + } + if err = Arg(args, KubeTaskParallelOutputCommandArg, &ktpf.outputCommand); err != nil { + return nil, err + } + if err = OptArg(args, KubeTaskParallelNamespaceArg, &ktpf.namespace, ""); err != nil { + return nil, err + } + if err = OptArg(args, KubeTaskParallelVolumeMediumArg, &ktpf.storageMedium, ""); err != nil { + return nil, err + } + if err = OptArg(args, KubeTaskParallelVolumeSizeLimitArg, &storageSizeString, ""); err != nil { + return nil, err + } + if storageSizeString != "" { + size, err := resource.ParseQuantity(storageSizeString) + if err != nil { + return nil, errors.Wrapf(err, "Failed to parse sharedStorageSize arg") + } + ktpf.storageSizeLimit = &size + } + if err = OptArg(args, KubeTaskParallelSharedDirArg, &ktpf.storageDir, ktpDefaultSharedDir); err != nil { + return nil, err + } + if err = OptArg(args, PodAnnotationsArg, &bpAnnotations, nil); err != nil { + return nil, err + } + if err = OptArg(args, PodLabelsArg, &bpLabels, nil); err != nil { + return nil, err + } + + ktpf.podOverride, err = GetPodSpecOverride(tp, args, KubeTaskParallelPodOverrideArg) + if err != nil { + return nil, err + } + + ktpf.labels = bpLabels + ktpf.annotations = bpAnnotations + if tp.PodAnnotations != nil { + // merge the actionset annotations with blueprint annotations + var actionSetAnn ActionSetAnnotations = tp.PodAnnotations + ktpf.annotations = actionSetAnn.MergeBPAnnotations(bpAnnotations) + } + + if tp.PodLabels != nil { + // merge the actionset labels with blueprint labels + var actionSetLabels ActionSetLabels = tp.PodLabels + ktpf.labels = actionSetLabels.MergeBPLabels(bpLabels) + } + + cli, err := kube.NewClient() + if err != nil { + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") + } + return ktpf.run( + ctx, + cli, + ) +} + +func (*kubeTaskParallelFunc) RequiredArgs() []string { + return []string{ + KubeTaskParallelBackgroundImageArg, + KubeTaskParallelBackgroundCommandArg, + KubeTaskParallelOutputImageArg, + KubeTaskParallelOutputCommandArg, + } +} + +func (*kubeTaskParallelFunc) Arguments() []string { + return []string{ + KubeTaskParallelNamespaceArg, + KubeTaskParallelBackgroundImageArg, + KubeTaskParallelBackgroundCommandArg, + KubeTaskParallelOutputImageArg, + KubeTaskParallelOutputCommandArg, + KubeTaskParallelVolumeMediumArg, + KubeTaskParallelVolumeSizeLimitArg, + KubeTaskParallelSharedDirArg, + KubeTaskParallelPodOverrideArg, + PodLabelsArg, + PodAnnotationsArg, + } +} + +func (ktpf *kubeTaskParallelFunc) Validate(args map[string]any) error { + if err := ValidatePodLabelsAndAnnotations(ktpf.Name(), args); err != nil { + return err + } + + if err := utils.CheckSupportedArgs(ktpf.Arguments(), args); err != nil { + return err + } + + return utils.CheckRequiredArgs(ktpf.RequiredArgs(), args) +} + +func (k *kubeTaskParallelFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) { + metav1Time := metav1.NewTime(time.Now()) + return crv1alpha1.PhaseProgress{ + ProgressPercent: k.progressPercent, + LastTransitionTime: &metav1Time, + }, nil +} diff --git a/pkg/function/kube_task_parallel_test.go b/pkg/function/kube_task_parallel_test.go new file mode 100644 index 0000000000..ce0cb0b71e --- /dev/null +++ b/pkg/function/kube_task_parallel_test.go @@ -0,0 +1,132 @@ +// Copyright 2019 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package function + +import ( + "context" + "os" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + kanister "github.com/kanisterio/kanister/pkg" + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/consts" + "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/param" + + . "gopkg.in/check.v1" +) + +var _ = Suite(&KubeTaskParallelSuite{}) + +type KubeTaskParallelSuite struct { + cli kubernetes.Interface + namespace string +} + +func (s *KubeTaskParallelSuite) SetUpSuite(c *C) { + cli, err := kube.NewClient() + c.Assert(err, IsNil) + s.cli = cli + + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "kanister-kubetaskparalleltest-", + }, + } + cns, err := s.cli.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) + c.Assert(err, IsNil) + s.namespace = cns.Name + err = os.Setenv("POD_NAMESPACE", cns.Name) + c.Assert(err, IsNil) + err = os.Setenv("POD_SERVICE_ACCOUNT", "default") + c.Assert(err, IsNil) +} + +func (s *KubeTaskParallelSuite) TearDownSuite(c *C) { + if s.namespace != "" { + _ = s.cli.CoreV1().Namespaces().Delete(context.TODO(), s.namespace, metav1.DeleteOptions{}) + } +} + +func kubeTaskParallelPhase(namespace string) crv1alpha1.BlueprintPhase { + return crv1alpha1.BlueprintPhase{ + Name: "testKubeTaskParallel", + Func: KubeTaskParallelFuncName, + Args: map[string]interface{}{ + KubeTaskParallelNamespaceArg: namespace, + KubeTaskParallelBackgroundImageArg: consts.LatestKanisterToolsImage, + KubeTaskParallelBackgroundCommandArg: []string{ + "sh", + "-c", + "echo foo > /tmp/file", + }, + KubeTaskParallelOutputImageArg: consts.LatestKanisterToolsImage, + KubeTaskParallelOutputCommandArg: []string{ + "sh", + "-c", + "while [ ! -e /tmp/file ]; do sleep 1; done; kando output value $(cat /tmp/file)", + }, + }, + } +} + +func (s *KubeTaskParallelSuite) TestKubeTaskParallel(c *C) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + tp := param.TemplateParams{ + StatefulSet: ¶m.StatefulSetParams{ + Namespace: s.namespace, + }, + PodOverride: crv1alpha1.JSONMap{ + "containers": []map[string]interface{}{ + { + "name": "background", + "imagePullPolicy": "Always", + }, + { + "name": "output", + "imagePullPolicy": "Always", + }, + }, + }, + } + action := "test" + for _, tc := range []struct { + bp *crv1alpha1.Blueprint + outs []map[string]interface{} + }{ + { + bp: newTaskBlueprint(kubeTaskParallelPhase(s.namespace)), + outs: []map[string]interface{}{ + { + "value": "foo", + }, + }, + }, + } { + phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp) + c.Assert(err, IsNil) + c.Assert(phases, HasLen, len(tc.outs)) + for i, p := range phases { + out, err := p.Exec(ctx, *tc.bp, action, tp) + c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name())) + c.Assert(out, DeepEquals, tc.outs[i]) + } + } +} diff --git a/pkg/function/utils.go b/pkg/function/utils.go index 9d73e7caad..611bec8e7b 100644 --- a/pkg/function/utils.go +++ b/pkg/function/utils.go @@ -38,6 +38,10 @@ const ( PodAnnotationsArg = "podAnnotations" ) +const ( + defaultContainerAnn = "kubectl.kubernetes.io/default-container" +) + // ValidateCredentials verifies if the given credentials have appropriate values set func ValidateCredentials(creds *param.Credential) error { if creds == nil { diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index 339189110a..5819d02a00 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -172,7 +172,7 @@ func GetPodObjectFromPodOptions(ctx context.Context, cli kubernetes.Interface, o } // Patch default Pod Specs if needed - patchedSpecs, err := patchDefaultPodSpecs(defaultSpecs, opts.PodOverride) + patchedSpecs, err := PatchDefaultPodSpecs(defaultSpecs, opts.PodOverride) if err != nil { return nil, errkit.Wrap(err, "Failed to create pod. Failed to override pod specs.", "namespace", opts.Namespace, "nameFmt", opts.GenerateName) } @@ -515,8 +515,8 @@ func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespa return errkit.Wrap(err, errorMessage) } -// use Strategic Merge to patch default pod specs with the passed specs -func patchDefaultPodSpecs(defaultPodSpecs corev1.PodSpec, override crv1alpha1.JSONMap) (corev1.PodSpec, error) { +// PatchDefaultPodSpecs paches default pod specs with the passed override using Strategic Merge. +func PatchDefaultPodSpecs(defaultPodSpecs corev1.PodSpec, override crv1alpha1.JSONMap) (corev1.PodSpec, error) { // Merge default specs and override specs with StrategicMergePatch mergedPatch, err := strategicMergeJSONPatch(defaultPodSpecs, override) if err != nil { diff --git a/pkg/kube/pod_test.go b/pkg/kube/pod_test.go index 3033f03a20..2426a88ea8 100644 --- a/pkg/kube/pod_test.go +++ b/pkg/kube/pod_test.go @@ -845,7 +845,7 @@ func (s *PodSuite) TestPatchDefaultPodSpecs(c *C) { for _, test := range tests { override, err := CreateAndMergeJSONPatch(test.BlueprintPodSpecs, test.ActionsetPodSpecs) c.Assert(err, IsNil) - podSpec, err := patchDefaultPodSpecs(defaultSpecs, override) + podSpec, err := PatchDefaultPodSpecs(defaultSpecs, override) c.Assert(err, IsNil) c.Assert(podSpec, DeepEquals, test.Expected) } diff --git a/releasenotes/notes/kube-task-parallel-function-d488516c0f3b22c6.yaml b/releasenotes/notes/kube-task-parallel-function-d488516c0f3b22c6.yaml new file mode 100644 index 0000000000..1a579bce51 --- /dev/null +++ b/releasenotes/notes/kube-task-parallel-function-d488516c0f3b22c6.yaml @@ -0,0 +1,2 @@ +--- +features: Introduce new KubeTaskParallel function to run pods with two containers connected by shared volume