Skip to content

Commit

Permalink
feat: Add finalizer to workflow pod to prevent 'pod deleted'. Fixes a…
Browse files Browse the repository at this point in the history
…rgoproj#8783 Continuing Work of argoproj#9058 (argoproj#12413)

Signed-off-by: Atsushi Sakai <sakai.at24@gmail.com>
  • Loading branch information
sakai-ast authored and isubasinghe committed Feb 4, 2024
1 parent a2259fd commit 09067a9
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 21 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ jobs:
run: make cli STATIC_FILES=false
if: ${{matrix.test == 'test-api' || matrix.test == 'test-cli' || matrix.test == 'test-java-sdk' || matrix.test == 'test-python-sdk'}}
- name: Start controller/API
run: make start PROFILE=${{matrix.profile}} AUTH_MODE=client STATIC_FILES=false LOG_LEVEL=info API=${{matrix.test == 'test-api' || matrix.test == 'test-cli' || matrix.test == 'test-java-sdk' || matrix.test == 'test-python-sdk'}} UI=false > /tmp/argo.log 2>&1 &
run: |
make start PROFILE=${{matrix.profile}} \
AUTH_MODE=client STATIC_FILES=false \
LOG_LEVEL=info \
API=${{matrix.test == 'test-api' || matrix.test == 'test-cli' || matrix.test == 'test-java-sdk' || matrix.test == 'test-python-sdk'}} \
UI=false \
POD_STATUS_CAPTURE_FINALIZER=true > /tmp/argo.log 2>&1 &
- name: Wait for controller to be up
run: make wait API=${{matrix.test == 'test-api' || matrix.test == 'test-cli' || matrix.test == 'test-java-sdk' || matrix.test == 'test-python-sdk'}}
timeout-minutes: 5
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ RUN_MODE := kubernetes
endif

ALWAYS_OFFLOAD_NODE_STATUS := false
POD_STATUS_CAPTURE_FINALIZER ?= true

$(info GIT_COMMIT=$(GIT_COMMIT) GIT_BRANCH=$(GIT_BRANCH) GIT_TAG=$(GIT_TAG) GIT_TREE_STATE=$(GIT_TREE_STATE) RELEASE_TAG=$(RELEASE_TAG) DEV_BRANCH=$(DEV_BRANCH) VERSION=$(VERSION))
$(info KUBECTX=$(KUBECTX) DOCKER_DESKTOP=$(DOCKER_DESKTOP) K3D=$(K3D) DOCKER_PUSH=$(DOCKER_PUSH))
Expand Down Expand Up @@ -560,7 +561,7 @@ endif
grep '127.0.0.1.*postgres' /etc/hosts
grep '127.0.0.1.*mysql' /etc/hosts
ifeq ($(RUN_MODE),local)
env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) ARGO_SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) ARGO_LOGLEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) ARGO_AUTH_MODE=$(AUTH_MODE) ARGO_NAMESPACED=$(NAMESPACED) ARGO_NAMESPACE=$(KUBE_NAMESPACE) ARGO_MANAGED_NAMESPACE=$(MANAGED_NAMESPACE) ARGO_EXECUTOR_PLUGINS=$(PLUGINS) PROFILE=$(PROFILE) kit $(TASKS)
env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) ARGO_SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) ARGO_LOGLEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) ARGO_AUTH_MODE=$(AUTH_MODE) ARGO_NAMESPACED=$(NAMESPACED) ARGO_NAMESPACE=$(KUBE_NAMESPACE) ARGO_MANAGED_NAMESPACE=$(MANAGED_NAMESPACE) ARGO_EXECUTOR_PLUGINS=$(PLUGINS) ARGO_POD_STATUS_CAPTURE_FINALIZER=$(POD_STATUS_CAPTURE_FINALIZER) PROFILE=$(PROFILE) kit $(TASKS)
endif

.PHONY: wait
Expand All @@ -583,7 +584,7 @@ mysql-cli:
test-cli: ./dist/argo

test-%:
go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags $* -parallel $(E2E_PARALLEL) ./test/e2e
E2E_WAIT_TIMEOUT=$(E2E_WAIT_TIMEOUT) go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags $* -parallel $(E2E_PARALLEL) ./test/e2e

.PHONY: test-examples
test-examples:
Expand All @@ -594,7 +595,7 @@ test-%-sdk:
make --directory sdks/$* install test -B

Test%:
go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags api,cli,cron,executor,examples,corefunctional,functional,plugins -parallel $(E2E_PARALLEL) ./test/e2e -run='.*/$*'
E2E_WAIT_TIMEOUT=$(E2E_WAIT_TIMEOUT) go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags api,cli,cron,executor,examples,corefunctional,functional,plugins -parallel $(E2E_PARALLEL) ./test/e2e -run='.*/$*'


# clean
Expand Down
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This document outlines environment variables that can be used to customize behav
| `ARGO_AGENT_PATCH_RATE` | `time.Duration` | `DEFAULT_REQUEUE_TIME` | Rate that the Argo Agent will patch the workflow task-set. |
| `ARGO_AGENT_CPU_LIMIT` | `resource.Quantity` | `100m` | CPU resource limit for the agent. |
| `ARGO_AGENT_MEMORY_LIMIT` | `resource.Quantity` | `256m` | Memory resource limit for the agent. |
| `ARGO_POD_STATUS_CAPTURE_FINALIZER` | `bool` | `false` | The finalizer blocks the deletion of pods until the controller captures their status.
| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. |
| `CACHE_GC_PERIOD` | `time.Duration` | `0s` | How often to perform memoization cache GC, which is disabled by default and can be enabled by providing a non-zero duration. |
| `CACHE_GC_AFTER_NOT_HIT_DURATION` | `time.Duration` | `30s` | When a memoization cache has not been hit after this duration, it will be deleted. |
Expand Down
24 changes: 23 additions & 1 deletion test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package fixtures
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"time"

"github.com/argoproj/argo-workflows/v3/util/secrets"

apierr "k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"

"github.com/TwiN/go-color"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -124,6 +129,7 @@ func (s *E2ESuite) DeleteResources() {
return Label
}

pods := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
resources := []schema.GroupVersionResource{
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural},
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.WorkflowPlural},
Expand All @@ -132,12 +138,28 @@ func (s *E2ESuite) DeleteResources() {
{Group: workflow.Group, Version: workflow.Version, Resource: workflow.WorkflowEventBindingPlural},
{Group: workflow.Group, Version: workflow.Version, Resource: "sensors"},
{Group: workflow.Group, Version: workflow.Version, Resource: "eventsources"},
{Version: "v1", Resource: "pods"},
pods,
{Version: "v1", Resource: "resourcequotas"},
{Version: "v1", Resource: "configmaps"},
}
for _, r := range resources {
for {
// remove finalizer from all the resources of the given GroupVersionResource
resourceInf := s.dynamicFor(pods)
resourceList, err := resourceInf.List(ctx, metav1.ListOptions{LabelSelector: common.LabelKeyCompleted + "=false"})
s.CheckError(err)
for _, item := range resourceList.Items {
patch, err := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"finalizers": []string{},
},
})
s.CheckError(err)
_, err = resourceInf.Patch(ctx, item.GetName(), types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil && !apierr.IsNotFound(err) {
s.CheckError(err)
}
}
s.CheckError(s.dynamicFor(r).DeleteCollection(ctx, metav1.DeleteOptions{GracePeriodSeconds: pointer.Int64Ptr(2)}, metav1.ListOptions{LabelSelector: l(r)}))
ls, err := s.dynamicFor(r).List(ctx, metav1.ListOptions{LabelSelector: l(r)})
s.CheckError(err)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (w *When) WaitForPod(condition PodCondition) *When {
timeout := defaultTimeout
watch, err := w.kubeClient.CoreV1().Pods(Namespace).Watch(
ctx,
metav1.ListOptions{LabelSelector: common.LabelKeyWorkflow + "=" + w.wf.Name, TimeoutSeconds: pointer.Int64Ptr(int64(timeout.Seconds()))},
metav1.ListOptions{LabelSelector: common.LabelKeyWorkflow + "=" + w.wf.Name, TimeoutSeconds: pointer.Int64(int64(timeout.Seconds()))},
)
if err != nil {
w.t.Fatal(err)
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func (s *FunctionalSuite) TestDeletingPendingPod() {
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToStart).
// patch the pod to remove the finalizer
Exec("kubectl", []string{"-n", "argo", "patch", "pod", func() string {
podList, err := s.KubeClient.CoreV1().Pods("argo").List(context.Background(), metav1.ListOptions{LabelSelector: "workflows.argoproj.io/workflow"})
if err != nil {
panic(err)
}
return podList.Items[0].Name
}(), "-p", `{"metadata":{"finalizers":[]}}`, "--type", "merge"}, fixtures.OutputRegexp(`pod/.* patched`)).
Wait(time.Second).
Exec("kubectl", []string{"-n", "argo", "delete", "pod", "-l", "workflows.argoproj.io/workflow"}, fixtures.OutputRegexp(`pod "pending-.*" deleted`)).
Wait(time.Duration(3*fixtures.EnvFactor)*time.Second). // allow 3s for reconciliation, we'll create a new pod
Exec("kubectl", []string{"-n", "argo", "get", "pod", "-l", "workflows.argoproj.io/workflow"}, fixtures.OutputRegexp(`pending-.*Pending`))
Expand Down
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ const (
// Finalizer to block deletion of the workflow if deletion of artifacts fail for some reason.
FinalizerArtifactGC = workflow.WorkflowFullName + "/artifact-gc"

// Finalizer blocks the deletion of pods until the controller captures their status.
FinalizerPodStatus = workflow.WorkflowFullName + "/status"

// Variables that are added to the scope during template execution and can be referenced using {{}} syntax

// GlobalVarWorkflowName is a global workflow variable referencing the workflow's metadata.name field
Expand Down
101 changes: 89 additions & 12 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
gosync "sync"
"syscall"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"k8s.io/client-go/dynamic"
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
apiwatch "k8s.io/client-go/tools/watch"
Expand Down Expand Up @@ -150,6 +152,12 @@ type WorkflowController struct {
recentCompletions recentCompletions
}

type PatchOperation struct {
Operation string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}

const (
workflowResyncPeriod = 20 * time.Minute
workflowTemplateResyncPeriod = 20 * time.Minute
Expand Down Expand Up @@ -522,10 +530,9 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
logCtx := log.WithFields(log.Fields{"key": key, "action": action})
logCtx.Info("cleaning up pod")
err := func() error {
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
switch action {
case terminateContainers:
pod, err := wfc.getPod(namespace, podName)
pod, err := wfc.getPodFromCache(namespace, podName)
if err == nil && pod != nil && pod.Status.Phase == apiv1.PodPending {
wfc.queuePodForCleanup(namespace, podName, deletePod)
} else if terminationGracePeriod, err := wfc.signalContainers(namespace, podName, syscall.SIGTERM); err != nil {
Expand All @@ -538,17 +545,22 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
return err
}
case labelPodCompleted:
_, err := pods.Patch(
ctx,
podName,
types.MergePatchType,
[]byte(`{"metadata": {"labels": {"workflows.argoproj.io/completed": "true"}}}`),
metav1.PatchOptions{},
)
if err != nil {
// Escape for JSON Pointer https://datatracker.ietf.org/doc/html/rfc6901#section-3
escaped := strings.ReplaceAll(common.LabelKeyCompleted, "/", "~1")
patch := PatchOperation{
Operation: "replace",
Path: fmt.Sprintf("/metadata/labels/%s", escaped),
Value: "true",
}
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, patch); err != nil {
return err
}
case deletePod:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName); err != nil {
return err
}
propagation := metav1.DeletePropagationBackground
err := pods.Delete(ctx, podName, metav1.DeleteOptions{
PropagationPolicy: &propagation,
Expand All @@ -569,7 +581,15 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
return true
}

func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) {
func (wfc *WorkflowController) getPodFromAPI(ctx context.Context, namespace string, podName string) (*apiv1.Pod, error) {
pod, err := wfc.kubeclientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return pod, nil
}

func (wfc *WorkflowController) getPodFromCache(namespace string, podName string) (*apiv1.Pod, error) {
obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName)
if err != nil {
return nil, err
Expand All @@ -584,8 +604,50 @@ func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.
return pod, nil
}

func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, extraPatches ...PatchOperation) error {
var patches []PatchOperation
pod, err := wfc.getPodFromAPI(ctx, namespace, podName)
if err != nil {
return err
}
patch := createFinalizerRemovalPatchIfExists(pod, common.FinalizerPodStatus)
if patch != nil {
patches = append(patches, *patch)
}
patches = append(patches, extraPatches...)
if err := applyPatches(ctx, pods, pod.Name, patches); err != nil {
return err
}
return nil
}

func createFinalizerRemovalPatchIfExists(pod *apiv1.Pod, targetFinalizer string) *PatchOperation {
i := slices.Index(pod.Finalizers, targetFinalizer)
if i >= 0 {
return &PatchOperation{
Operation: "remove",
Path: fmt.Sprintf("/metadata/finalizers/%d", i),
}
}
return nil
}

func applyPatches(ctx context.Context, pods typedv1.PodInterface, podName string, patches []PatchOperation) error {
if len(patches) == 0 {
log.WithField("podName", podName).Debug("not patching pod")
return nil
}
data, err := json.Marshal(patches)
if err != nil {
return fmt.Errorf("failed to marshal patch: %w", err)
}
log.WithFields(log.Fields{"podName": podName, "data": string(data)}).Debug("patching pod")
_, err = pods.Patch(ctx, podName, types.JSONPatchType, data, metav1.PatchOptions{})
return err
}

func (wfc *WorkflowController) signalContainers(namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
pod, err := wfc.getPod(namespace, podName)
pod, err := wfc.getPodFromCache(namespace, podName)
if pod == nil || err != nil {
return 0, err
}
Expand Down Expand Up @@ -991,6 +1053,21 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.

// Remove finalizers from Pods if they exist before deletion
pods := wfc.kubeclientset.CoreV1().Pods(wfc.GetManagedNamespace())
podList, err := pods.List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", common.LabelKeyWorkflow, obj.(*unstructured.Unstructured).GetName()),
})
if err != nil {
log.WithError(err).Error("Failed to list pods")
}
for _, p := range podList.Items {
if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name); err != nil {
log.WithError(err).Error("Failed to enable pod for deletion")
}
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.releaseAllWorkflowLocks(obj)
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2597,7 +2597,7 @@ func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, erro
}

podName := woc.getPodName(node.Name, node.TemplateName)
return woc.controller.getPod(woc.wf.GetNamespace(), podName)
return woc.controller.getPodFromCache(woc.wf.GetNamespace(), podName)
}

func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
Expand Down
5 changes: 5 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"time"
Expand Down Expand Up @@ -186,6 +187,10 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
},
}

if os.Getenv("ARGO_POD_STATUS_CAPTURE_FINALIZER") == "true" {
pod.ObjectMeta.Finalizers = append(pod.ObjectMeta.Finalizers, common.FinalizerPodStatus)
}

if opts.onExitPod {
// This pod is part of an onExit handler, label it so
pod.ObjectMeta.Labels[common.LabelKeyOnExit] = "true"
Expand Down
39 changes: 37 additions & 2 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -1397,7 +1396,7 @@ func TestPodSpecPatchPodName(t *testing.T) {
{"v2", "hello-world-whalesay-3731220306", helloWorldStepWfWithPatch},
}
for _, tt := range tests {
os.Setenv("POD_NAMES", tt.podNameVersion)
t.Setenv("POD_NAMES", tt.podNameVersion)
ctx := context.Background()
wf := wfv1.MustUnmarshalWorkflow(tt.workflowYaml)
woc := newWoc(*wf)
Expand Down Expand Up @@ -1789,6 +1788,42 @@ func TestPodExists(t *testing.T) {
assert.EqualValues(t, pod, existingPod)
}

func TestPodFinalizerExits(t *testing.T) {
t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "true")
cancel, controller := newController()
defer cancel()

wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow(ctx)
assert.NoError(t, err)
mainCtr := woc.execWf.Spec.Templates[0].Container
pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.NoError(t, err)
assert.NotNil(t, pod)

assert.Equal(t, []string{common.FinalizerPodStatus}, pod.GetFinalizers())
}

func TestPodFinalizerDoesNotExist(t *testing.T) {
t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "false")
cancel, controller := newController()
defer cancel()

wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow(ctx)
assert.NoError(t, err)
mainCtr := woc.execWf.Spec.Templates[0].Container
pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
assert.NoError(t, err)
assert.NotNil(t, pod)

assert.Equal(t, []string(nil), pod.GetFinalizers())
}

func TestProgressEnvVars(t *testing.T) {
setup := func(t *testing.T, options ...interface{}) (context.CancelFunc, *apiv1.Pod) {
cancel, controller := newController(options...)
Expand Down

0 comments on commit 09067a9

Please sign in to comment.