Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Clean up pods of fulfilled nodes when workflow manual retry. Fix… #12105

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rules:
- list
- watch
- delete
- patch
Copy link
Member

@agilgur5 agilgur5 Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty minor permission given the Server already has delete, but I was thinking this logic may make more sense on the Controller actually.

When the Controller detects a retry, it can check the Workflow's child Pods.

Need to think a bit more about how that would work, but that would preserve the existing separation of duties between Server and Controller, where the Server is just a simple intermediary for users that can be bypassed with correct RBAC.
The Server primarily reads and listens to changes, and its modifications are limited to signaling the Controller to perform an action (via a label for instance). But the Controller logic is actually responsible to perform the actions themselves (such as retrying)

Copy link
Member

@agilgur5 agilgur5 Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I see that podsToDelete already broke the separation a bit... we honestly may want to refactor that too...

@terrytangyuan do you have any thoughts on this? Specifically, the current behavior with the Server having Pod modification logic actually invalidates your answer in #12027 (comment). I think it ideally should behave the way you described there (and how I described above), if possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. RBAC modifications is required for my answer in #12027 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@terrytangyuan I was actually looking for your thoughts on the approach. I think we should refactor this logic to not have the Server perform anything but a label modification, and then the Controller should actually delete, reset, etc child Pods as needed (as that is the Controller's responsibility, not the Server's)

Copy link
Member

@sarabala1979 sarabala1979 Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@terrytangyuan I agree with @agilgur5. Server should not update or delete other than Argo workflow CRDs. If some user directly updates the WF spec then this will not work. Controller should handle the scenario to clean the workflow pods based on GCStrategy.
I remember @ishitasequeira was proposed to delete all workflow pods "label based" like workflowname deletion if the workflow complete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Julie agreed in #12419 (comment) as well, so I filed #12538 as a tracking issue for this refactor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

- apiGroups:
- ""
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rules:
- list
- watch
- delete
- patch
- apiGroups:
- ""
resources:
Expand Down
1 change: 1 addition & 0 deletions manifests/quick-start-minimal.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions manifests/quick-start-mysql.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions manifests/quick-start-postgres.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, sutils.ToStatusError(err, codes.Internal)
}

wf, podsToDelete, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
wf, podsToDelete, podsToReset, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
Expand All @@ -402,6 +402,20 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
}
}

for _, podName := range podsToReset {
log.WithFields(log.Fields{"podReset": podName}).Info("Resetting pod")
_, err := kubeClient.CoreV1().Pods(wf.Namespace).Patch(
ctx,
podName,
types.MergePatchType,
[]byte(`{"metadata": {"labels": {"workflows.argoproj.io/completed": "false"}}}`),
metav1.PatchOptions{},
)
if err != nil && !apierr.IsNotFound(err) {
return nil, sutils.ToStatusError(err, codes.Internal)
}
}

err = s.hydrator.Dehydrate(wf)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
Expand Down
17 changes: 16 additions & 1 deletion server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"

"github.com/argoproj/argo-workflows/v3/persist/sqldb"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
Expand Down Expand Up @@ -286,7 +287,7 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
_, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(ctx, wf.Name, metav1.GetOptions{})
if apierr.IsNotFound(err) {

wf, podsToDelete, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
wf, podsToDelete, podsToReset, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
Expand All @@ -299,6 +300,20 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
}
}

for _, podName := range podsToReset {
Copy link
Member

@agilgur5 agilgur5 Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was gonna say that this may make sense to place into a helper function, but I see that #7988 (comment) explicitly moved the kubeClient logic out of the helper function (when retry was introduced for archived workflows in #7988).

So this matches existing behavior with podsToDelete above. If we refactored these into the Controller (per above comment), this would go away though 🤔

log.WithFields(log.Fields{"podReset": podName}).Info("Resetting pod")
_, err := kubeClient.CoreV1().Pods(wf.Namespace).Patch(
ctx,
podName,
types.MergePatchType,
[]byte(`{"metadata": {"labels": {"workflows.argoproj.io/completed": "false"}}}`),
metav1.PatchOptions{},
)
if err != nil && !apierr.IsNotFound(err) {
return nil, sutils.ToStatusError(err, codes.Internal)
}
}

wf.ObjectMeta.ResourceVersion = ""
wf.ObjectMeta.UID = ""
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{})
Expand Down
3 changes: 2 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,8 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err == nil {
// Pods of fulfilled nodes would be relabeled completed=false when workflow manual retry.
if err == nil && !node.Phase.Fulfilled() {
if newState := woc.assessNodeStatus(pod, node); newState != nil {
woc.addOutputsToGlobalScope(newState.Outputs)
if newState.MemoizationStatus != nil {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5677,7 +5677,7 @@ status:
name: my-wf
phase: Failed
`)
wf, _, err := util.FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"})
wf, _, _, err := util.FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"})
if assert.NoError(t, err) {
cancel, controller := newController(wf)
defer cancel()
Expand Down
25 changes: 16 additions & 9 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,15 +831,15 @@ func resetConnectedParentGroupNodes(oldWF *wfv1.Workflow, newWF *wfv1.Workflow,
}

// FormulateRetryWorkflow formulates a previous workflow to be retried, deleting all failed steps as well as the onExit node (and children)
func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, parameters []string) (*wfv1.Workflow, []string, error) {
func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, parameters []string) (*wfv1.Workflow, []string, []string, error) {
switch wf.Status.Phase {
case wfv1.WorkflowFailed, wfv1.WorkflowError:
case wfv1.WorkflowSucceeded:
if !(restartSuccessful && len(nodeFieldSelector) > 0) {
return nil, nil, errors.Errorf(errors.CodeBadRequest, "To retry a succeeded workflow, set the options restartSuccessful and nodeFieldSelector")
return nil, nil, nil, errors.Errorf(errors.CodeBadRequest, "To retry a succeeded workflow, set the options restartSuccessful and nodeFieldSelector")
}
default:
return nil, nil, errors.Errorf(errors.CodeBadRequest, "Cannot retry a workflow in phase %s", wf.Status.Phase)
return nil, nil, nil, errors.Errorf(errors.CodeBadRequest, "Cannot retry a workflow in phase %s", wf.Status.Phase)
}

newWF := wf.DeepCopy()
Expand Down Expand Up @@ -870,21 +870,22 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}
err := overrideParameters(newWF, parameters)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}

onExitNodeName := wf.ObjectMeta.Name + ".onExit"
// Get all children of nodes that match filter
nodeIDsToReset, err := getNodeIDsToReset(restartSuccessful, nodeFieldSelector, wf.Status.Nodes)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

// Iterate the previous nodes. If it was successful Pod carry it forward
deletedNodes := make(map[string]bool)
deletedPods := make(map[string]bool)
var podsToDelete []string
var podsToReset []string
var resetParentGroupNodes []string
for _, node := range wf.Status.Nodes {
doForceResetNode := false
Expand All @@ -906,7 +907,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
childNode, err := wf.Status.Nodes.Get(child)
if err != nil {
log.Fatalf("was unable to obtain node for %s due to %s", child, err)
return nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", child, err)
return nil, nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", child, err)
}
if _, present := nodeIDsToReset[child]; present {
log.Debugf("Group node %s needs to reset since its child %s is in the force reset path", node.Name, childNode.Name)
Expand Down Expand Up @@ -936,7 +937,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
descendantNode, err := wf.Status.Nodes.Get(descendantNodeID)
if err != nil {
log.Fatalf("Was unable to obtain node for %s due to %s", descendantNodeID, err)
return nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", descendantNodeID, err)
return nil, nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", descendantNodeID, err)
}
if descendantNode.Type == wfv1.NodeTypePod {
newWF, resetParentGroupNodes = resetConnectedParentGroupNodes(wf, newWF, node, resetParentGroupNodes)
Expand All @@ -952,6 +953,12 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
}
} else {
if !containsNode(resetParentGroupNodes, node.ID) {
if node.Type == wfv1.NodeTypePod {
templateName := GetTemplateFromNode(node)
version := GetWorkflowPodNameVersion(wf)
podName := GeneratePodName(wf.Name, node.Name, templateName, node.ID, version)
podsToReset = append(podsToReset, podName)
}
log.Debugf("Node %s remains as is", node.Name)
newWF.Status.Nodes.Set(node.ID, node)
}
Expand All @@ -971,7 +978,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
// do not add this status to the node. pretend as if this node never existed.
default:
// Do not allow retry of workflows with pods in Running/Pending phase
return nil, nil, errors.InternalErrorf("Workflow cannot be retried with node %s in %s phase", node.Name, node.Phase)
return nil, nil, nil, errors.InternalErrorf("Workflow cannot be retried with node %s in %s phase", node.Name, node.Phase)
}

if node.Name == wf.ObjectMeta.Name {
Expand Down Expand Up @@ -1015,7 +1022,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
newWF.Status.StoredTemplates[id] = tmpl
}

return newWF, podsToDelete, nil
return newWF, podsToDelete, podsToReset, nil
}

func resetNode(node wfv1.NodeStatus) wfv1.NodeStatus {
Expand Down
Loading
Loading