diff --git a/cli/pipeline/run.go b/cli/pipeline/run.go index a389351..ed17615 100644 --- a/cli/pipeline/run.go +++ b/cli/pipeline/run.go @@ -128,6 +128,10 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, podDefinition.parseSecrets(flags.Secrets) podDefinition.parseEnv(flags.Env) + if podDefinition.needsVolume() { + createVolumeClaim(clientset, podDefinition, flags) + } + stepPodBuffer := podDefinition.compile() pod := &v1.Pod{} yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) @@ -160,6 +164,8 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, } }() + removed := map[string]bool{} + for { select { case e := <-watch: @@ -174,7 +180,10 @@ func runPipelineStep(pipeline *PipelineDefinition, step *PipelineDefinitionStep, log.Println("[paddle] Pod deleted") return errors.New("Pod was deleted unexpectedly.") case Removed: - log.Printf("[paddle] Container removed: %s", e.Container) + if !removed[e.Container] { + log.Printf("[paddle] Container removed: %s", e.Container) + } + removed[e.Container] = true continue case Completed: log.Printf("[paddle] Pod execution completed") @@ -242,3 +251,45 @@ func deleteAndWait(c kubernetes.Interface, podDefinition *PodDefinition, flags * }) return err } + +func createVolumeClaim(c kubernetes.Interface, podDefinition *PodDefinition, flags *runCmdFlagsStruct) error { + log.Printf("[paddle] Creating volume claim for %s", podDefinition.PodName) + claim := &v1.PersistentVolumeClaim{} + claimBuffer := podDefinition.compileVolumeClaim() + yaml.NewYAMLOrJSONDecoder(claimBuffer, 4096).Decode(claim) + + claims := clientset.CoreV1().PersistentVolumeClaims(podDefinition.Namespace) + + deleting := false + var gracePeriod int64 + opts := metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod} + err := wait.PollImmediate(flags.DeletePollInterval, deleteTimeout, func() (bool, error) { + var err error + err = claims.Delete(claim.Name, &opts) + if err != nil { + if k8errors.IsNotFound(err) { + if deleting { + log.Printf("[paddle] Deleted volume clain %s", claim.Name) + } + return true, nil + } else { + return true, err + } + } + if !deleting { + log.Printf("[paddle] Deleting volume claim %s", claim.Name) + deleting = true + } + return false, nil + }) + if err != nil { + return err + } + + _, err = claims.Create(claim) + if err != nil { + return err + } + log.Printf("[paddle] Created volume claim %s", claim.Name) + return nil +} diff --git a/cli/pipeline/template.go b/cli/pipeline/template.go index 7e397da..45942e1 100644 --- a/cli/pipeline/template.go +++ b/cli/pipeline/template.go @@ -31,6 +31,10 @@ type PodDefinition struct { Step PipelineDefinitionStep } +func (d *PodDefinition) needsVolume() bool { + return d.Step.Resources.Storage != 0 +} + const podTemplate = ` apiVersion: v1 kind: Pod @@ -47,8 +51,13 @@ spec: volumes: - name: shared-data + {{ if ne .Step.Resources.Storage 0 }} + persistentVolumeClaim: + claimName: {{ .PodName }}-volume-claim + {{ else }} emptyDir: medium: '' + {{ end }} containers: - name: main @@ -62,10 +71,6 @@ spec: limits: cpu: "{{ .Step.Resources.CPU }}" memory: "{{ .Step.Resources.Memory }}" - {{ if ne .Step.Resources.Storage 0 }} - requests: - ephemeral-storage: {{ .Step.Resources.Storage }}Mi - {{ end }} command: - "/bin/sh" - "-c" @@ -114,11 +119,6 @@ spec: - name: shared-data mountPath: /data - {{ if ne .Step.Resources.Storage 0 }} - resources: - requests: - ephemeral-storage: {{ .Step.Resources.Storage }}Mi - {{ end }} command: - "/bin/sh" - "-c" @@ -164,6 +164,20 @@ spec: key: aws-secret-access-key ` +const volumeTemplate = ` +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: {{ .PodName }}-volume-claim +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Step.Resources.Storage }}Mi + persistentVolumeReclaimPolicy: Delete +` + func NewPodDefinition(pipelineDefinition *PipelineDefinition, pipelineDefinitionStep *PipelineDefinitionStep) *PodDefinition { stepName := sanitizeName(pipelineDefinitionStep.Step) branchName := sanitizeName(pipelineDefinitionStep.Branch) @@ -195,6 +209,16 @@ func (p PodDefinition) compile() *bytes.Buffer { return buffer } +func (p PodDefinition) compileVolumeClaim() *bytes.Buffer { + tmpl := template.Must(template.New("volumeTemplate").Parse(volumeTemplate)) + buffer := new(bytes.Buffer) + err := tmpl.Execute(buffer, p) + if err != nil { + panic(err.Error()) + } + return buffer +} + func (p *PodDefinition) parseSecrets(secrets []string) { for _, secret := range secrets { secretParts := strings.Split(secret, ":") diff --git a/cli/pipeline/template_test.go b/cli/pipeline/template_test.go index dc648cb..95a495b 100644 --- a/cli/pipeline/template_test.go +++ b/cli/pipeline/template_test.go @@ -29,25 +29,6 @@ func TestCompileTemplate(t *testing.T) { if pod.Spec.Containers[0].Image != pipeline.Steps[0].Image { t.Errorf("First image is %s", pod.Spec.Containers[0].Image) } - - if !pod.Spec.Containers[0].Resources.Requests.StorageEphemeral().IsZero() { - t.Errorf("Storage requirements is %v, expected", pod.Spec.Containers[0].Resources.Requests.StorageEphemeral()) - } - - podDefinition = NewPodDefinition(pipeline, &pipeline.Steps[1]) - - stepPodBuffer = podDefinition.compile() - - pod = &v1.Pod{} - yaml.NewYAMLOrJSONDecoder(stepPodBuffer, 4096).Decode(pod) - - if pod.Spec.Containers[0].Resources.Requests.StorageEphemeral().Value() != int64(1048576000) { - t.Errorf("Storage requirements is %v, expected %v", pod.Spec.Containers[1].Resources.Requests.StorageEphemeral().Value(), pipeline.Steps[1].Resources.Storage) - } - - if pod.Spec.Containers[1].Resources.Requests.StorageEphemeral().Value() != int64(1048576000) { - t.Errorf("Storage requirements is %v, expected %v", pod.Spec.Containers[1].Resources.Requests.StorageEphemeral().Value(), pipeline.Steps[1].Resources.Storage) - } } func TestSecrets(t *testing.T) {