Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Delete job and pods in k8s StopTask if stuck in pending #3143

Merged
merged 2 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/3143.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
plugin/k8s: clean up pending pods from cancelled jobs
```
4 changes: 2 additions & 2 deletions builtin/k8s/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func configureContainer(
}
resourceRequests[resourceName] = q
} else {
log.Warn("ignoring unrecognized k8s resources key: %q", k)
log.Warn("ignoring unrecognized k8s resources key", "key", k)
}
}

Expand Down Expand Up @@ -622,7 +622,7 @@ func (p *Platform) resourceDeploymentCreate(

// App container must have some kind of port
if len(appContainerSpec.Ports) == 0 {
log.Warn("No ports defined in waypoint.hcl - defaulting to http on port %d", DefaultServicePort)
log.Warn("No ports defined in waypoint.hcl - defaulting to http on port", "port", DefaultServicePort)
appContainerSpec.Ports = append(appContainerSpec.Ports, &Port{Port: DefaultServicePort, Name: "http"})
}

Expand Down
63 changes: 55 additions & 8 deletions builtin/k8s/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/status"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -143,13 +144,59 @@ func (p *TaskLauncher) StopTask(
log hclog.Logger,
ti *TaskInfo,
) error {
// Purposely do nothing. We leverage the job TTL feature in Kube 1.19+
// so that Kubernetes automatically deletes old jobs after they complete
// running.
// If a job completes and the coresponding pod exits with a "completed"
// status, we urposely do nothing here. We leverage the job TTL feature in
// Kube 1.19+ so that Kubernetes automatically deletes old jobs and pods
// after they complete running.
//
// In the future, we may want to get more clever about this and explicitly
// delete jobs under certain conditions, but for now we leave them around
// and let K8S clean it up
// If a Waypoint job is cancelled or otherwise times out, we check for
// existing Kubernetes jobs and delete them, and clean up any Pending
// containers.
clientSet, ns, _, err := Clientset(p.config.KubeconfigPath, p.config.Context)
if err != nil {
return err
}
if p.config.Namespace != "" {
ns = p.config.Namespace
}

// Delete the job. This does *not* delete any running pods that the job
// created.
jobsClient := clientSet.BatchV1().Jobs(ns)
if err := jobsClient.Delete(ctx, ti.Id, metav1.DeleteOptions{}); err != nil {
if !errors.IsNotFound(err) {
return err
}
}

// List pods with this job label
podsClient := clientSet.CoreV1().Pods(ns)
pods, err := podsClient.List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", ti.Id),
})
// It's not clear from the documentation if an error is returned from the
// List API call if no jobs are found, so we guard here just in case
if err != nil && !errors.IsNotFound(err) {
return err
}

if pods == nil {
log.Info("no pods found for job, returning", "job_id", ti.Id)
return nil
}

// Delete any pods stuck in pending
for _, p := range pods.Items {
if p.Status.Phase == corev1.PodPending {
log.Warn("job pod is in pending phase in StopTask operation, cancelling", "job_id", ti.Id)
if err := podsClient.Delete(ctx, p.Name, metav1.DeleteOptions{}); err != nil {
if !errors.IsNotFound(err) {
return err
}
}
}
}

return nil
}

Expand Down Expand Up @@ -205,8 +252,8 @@ func (p *TaskLauncher) StartTask(
}

// Get container resource limits and requests
var resourceLimits = make(map[corev1.ResourceName]k8sresource.Quantity)
var resourceRequests = make(map[corev1.ResourceName]k8sresource.Quantity)
resourceLimits := make(map[corev1.ResourceName]k8sresource.Quantity)
resourceRequests := make(map[corev1.ResourceName]k8sresource.Quantity)
resourceRequirements := corev1.ResourceRequirements{
Limits: resourceLimits,
Requests: resourceRequests,
Expand Down