diff --git a/manifests/cluster-install/argo-server-rbac/argo-server-clusterole.yaml b/manifests/cluster-install/argo-server-rbac/argo-server-clusterole.yaml index 0882c9a7b9c9..0be9eefff217 100644 --- a/manifests/cluster-install/argo-server-rbac/argo-server-clusterole.yaml +++ b/manifests/cluster-install/argo-server-rbac/argo-server-clusterole.yaml @@ -29,6 +29,7 @@ rules: - list - watch - delete + - patch - apiGroups: - "" resources: diff --git a/manifests/namespace-install/argo-server-rbac/argo-server-role.yaml b/manifests/namespace-install/argo-server-rbac/argo-server-role.yaml index 314177a3ef8e..c2e56a86376f 100644 --- a/manifests/namespace-install/argo-server-rbac/argo-server-role.yaml +++ b/manifests/namespace-install/argo-server-rbac/argo-server-role.yaml @@ -29,6 +29,7 @@ rules: - list - watch - delete + - patch - apiGroups: - "" resources: diff --git a/manifests/quick-start-minimal.yaml b/manifests/quick-start-minimal.yaml index 394f0eb64abd..6ea284a80c2c 100644 --- a/manifests/quick-start-minimal.yaml +++ b/manifests/quick-start-minimal.yaml @@ -1075,6 +1075,7 @@ rules: - list - watch - delete + - patch - apiGroups: - "" resources: diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index a697071215f9..c9d90b191ec9 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -1075,6 +1075,7 @@ rules: - list - watch - delete + - patch - apiGroups: - "" resources: diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index 3cfacf0160c3..a8a65899385e 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -1075,6 +1075,7 @@ rules: - list - watch - delete + - patch - apiGroups: - "" resources: diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index 88fce952bafe..3c6af5a75047 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -389,7 +389,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor return nil, sutils.ToStatusError(err, codes.Internal) } - wf, podsToDelete, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters) + wf, podsToDelete, podsToReset, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } @@ -402,6 +402,20 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor } } + for _, podName := range podsToReset { + log.WithFields(log.Fields{"podReset": podName}).Info("Resetting pod") + _, err := kubeClient.CoreV1().Pods(wf.Namespace).Patch( + ctx, + podName, + types.MergePatchType, + []byte(`{"metadata": {"labels": {"workflows.argoproj.io/completed": "false"}}}`), + metav1.PatchOptions{}, + ) + if err != nil && !apierr.IsNotFound(err) { + return nil, sutils.ToStatusError(err, codes.Internal) + } + } + err = s.hydrator.Dehydrate(wf) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index 511fe8d3ecc6..eb0907be7339 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -17,6 +17,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" "github.com/argoproj/argo-workflows/v3/persist/sqldb" workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive" @@ -286,7 +287,7 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req _, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(ctx, wf.Name, metav1.GetOptions{}) if apierr.IsNotFound(err) { - wf, podsToDelete, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters) + wf, podsToDelete, podsToReset, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } @@ -299,6 +300,20 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req } } + for _, podName := range podsToReset { + log.WithFields(log.Fields{"podReset": podName}).Info("Resetting pod") + _, err := kubeClient.CoreV1().Pods(wf.Namespace).Patch( + ctx, + podName, + types.MergePatchType, + []byte(`{"metadata": {"labels": {"workflows.argoproj.io/completed": "false"}}}`), + metav1.PatchOptions{}, + ) + if err != nil && !apierr.IsNotFound(err) { + return nil, sutils.ToStatusError(err, codes.Internal) + } + } + wf.ObjectMeta.ResourceVersion = "" wf.ObjectMeta.UID = "" result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{}) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 98ebbaf7fbc8..fd9de4b2890a 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1099,7 +1099,8 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { wfNodesLock.Lock() defer wfNodesLock.Unlock() node, err := woc.wf.Status.Nodes.Get(nodeID) - if err == nil { + // Pods of fulfilled nodes would be relabeled completed=false when workflow manual retry. + if err == nil && !node.Phase.Fulfilled() { if newState := woc.assessNodeStatus(pod, node); newState != nil { woc.addOutputsToGlobalScope(newState.Outputs) if newState.MemoizationStatus != nil { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 0e3a431c8d73..07059d728add 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -5677,7 +5677,7 @@ status: name: my-wf phase: Failed `) - wf, _, err := util.FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"}) + wf, _, _, err := util.FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"}) if assert.NoError(t, err) { cancel, controller := newController(wf) defer cancel() diff --git a/workflow/util/util.go b/workflow/util/util.go index 3083c4f63ed5..6a9f3988ef24 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -831,15 +831,15 @@ func resetConnectedParentGroupNodes(oldWF *wfv1.Workflow, newWF *wfv1.Workflow, } // FormulateRetryWorkflow formulates a previous workflow to be retried, deleting all failed steps as well as the onExit node (and children) -func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, parameters []string) (*wfv1.Workflow, []string, error) { +func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSuccessful bool, nodeFieldSelector string, parameters []string) (*wfv1.Workflow, []string, []string, error) { switch wf.Status.Phase { case wfv1.WorkflowFailed, wfv1.WorkflowError: case wfv1.WorkflowSucceeded: if !(restartSuccessful && len(nodeFieldSelector) > 0) { - return nil, nil, errors.Errorf(errors.CodeBadRequest, "To retry a succeeded workflow, set the options restartSuccessful and nodeFieldSelector") + return nil, nil, nil, errors.Errorf(errors.CodeBadRequest, "To retry a succeeded workflow, set the options restartSuccessful and nodeFieldSelector") } default: - return nil, nil, errors.Errorf(errors.CodeBadRequest, "Cannot retry a workflow in phase %s", wf.Status.Phase) + return nil, nil, nil, errors.Errorf(errors.CodeBadRequest, "Cannot retry a workflow in phase %s", wf.Status.Phase) } newWF := wf.DeepCopy() @@ -870,7 +870,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce } err := overrideParameters(newWF, parameters) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } @@ -878,13 +878,14 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce // Get all children of nodes that match filter nodeIDsToReset, err := getNodeIDsToReset(restartSuccessful, nodeFieldSelector, wf.Status.Nodes) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Iterate the previous nodes. If it was successful Pod carry it forward deletedNodes := make(map[string]bool) deletedPods := make(map[string]bool) var podsToDelete []string + var podsToReset []string var resetParentGroupNodes []string for _, node := range wf.Status.Nodes { doForceResetNode := false @@ -906,7 +907,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce childNode, err := wf.Status.Nodes.Get(child) if err != nil { log.Fatalf("was unable to obtain node for %s due to %s", child, err) - return nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", child, err) + return nil, nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", child, err) } if _, present := nodeIDsToReset[child]; present { log.Debugf("Group node %s needs to reset since its child %s is in the force reset path", node.Name, childNode.Name) @@ -936,7 +937,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce descendantNode, err := wf.Status.Nodes.Get(descendantNodeID) if err != nil { log.Fatalf("Was unable to obtain node for %s due to %s", descendantNodeID, err) - return nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", descendantNodeID, err) + return nil, nil, nil, fmt.Errorf("Was unable to obtain node for %s due to %s", descendantNodeID, err) } if descendantNode.Type == wfv1.NodeTypePod { newWF, resetParentGroupNodes = resetConnectedParentGroupNodes(wf, newWF, node, resetParentGroupNodes) @@ -952,6 +953,12 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce } } else { if !containsNode(resetParentGroupNodes, node.ID) { + if node.Type == wfv1.NodeTypePod { + templateName := GetTemplateFromNode(node) + version := GetWorkflowPodNameVersion(wf) + podName := GeneratePodName(wf.Name, node.Name, templateName, node.ID, version) + podsToReset = append(podsToReset, podName) + } log.Debugf("Node %s remains as is", node.Name) newWF.Status.Nodes.Set(node.ID, node) } @@ -971,7 +978,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce // do not add this status to the node. pretend as if this node never existed. default: // Do not allow retry of workflows with pods in Running/Pending phase - return nil, nil, errors.InternalErrorf("Workflow cannot be retried with node %s in %s phase", node.Name, node.Phase) + return nil, nil, nil, errors.InternalErrorf("Workflow cannot be retried with node %s in %s phase", node.Name, node.Phase) } if node.Name == wf.ObjectMeta.Name { @@ -1015,7 +1022,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce newWF.Status.StoredTemplates[id] = tmpl } - return newWF, podsToDelete, nil + return newWF, podsToDelete, podsToReset, nil } func resetNode(node wfv1.NodeStatus) wfv1.NodeStatus { diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go index 0328b37d04ab..ff0049128f90 100644 --- a/workflow/util/util_test.go +++ b/workflow/util/util_test.go @@ -921,7 +921,7 @@ func TestDeepDeleteNodes(t *testing.T) { ctx := context.Background() wf, err := wfIf.Create(ctx, origWf, metav1.CreateOptions{}) if assert.NoError(t, err) { - newWf, _, err := FormulateRetryWorkflow(ctx, wf, false, "", nil) + newWf, _, _, err := FormulateRetryWorkflow(ctx, wf, false, "", nil) assert.NoError(t, err) newWfBytes, err := yaml.Marshal(newWf) assert.NoError(t, err) @@ -1052,7 +1052,7 @@ func TestRetryExitHandler(t *testing.T) { ctx := context.Background() wf, err := wfIf.Create(ctx, origWf, metav1.CreateOptions{}) if assert.NoError(t, err) { - newWf, _, err := FormulateRetryWorkflow(ctx, wf, false, "", nil) + newWf, _, _, err := FormulateRetryWorkflow(ctx, wf, false, "", nil) assert.NoError(t, err) newWfBytes, err := yaml.Marshal(newWf) assert.NoError(t, err) @@ -1086,7 +1086,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - wf, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) + wf, _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) if assert.NoError(t, err) { assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase) assert.Equal(t, metav1.Time{}, wf.Status.FinishedAt) @@ -1123,7 +1123,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - wf, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) + wf, _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) if assert.NoError(t, err) { if assert.Len(t, wf.Status.Nodes, 1) { assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes[""].Phase) @@ -1158,7 +1158,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=suspended", nil) + wf, _, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=suspended", nil) if assert.NoError(t, err) { if assert.Len(t, wf.Status.Nodes, 3) { assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["entrypoint"].Phase) @@ -1190,7 +1190,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=3", nil) + wf, _, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=3", nil) if assert.NoError(t, err) { // Node #3, #4 are deleted and will be recreated so only 3 nodes left in wf.Status.Nodes if assert.Len(t, wf.Status.Nodes, 3) { @@ -1219,7 +1219,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "", nil) + wf, _, _, err = FormulateRetryWorkflow(ctx, wf, true, "", nil) if assert.NoError(t, err) { // Node #2, #3, and #4 are deleted and will be recreated so only 2 nodes left in wf.Status.Nodes if assert.Len(t, wf.Status.Nodes, 4) { @@ -1248,7 +1248,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { "1": {ID: "1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypeTaskGroup}, }}, } - wf, _, err := FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"}) + wf, _, _, err := FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"}) if assert.NoError(t, err) { assert.Equal(t, "modified", wf.Spec.Arguments.Parameters[0].Value.String()) } @@ -1276,7 +1276,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { }}, }}, } - wf, _, err := FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"}) + wf, _, _, err := FormulateRetryWorkflow(context.Background(), wf, false, "", []string{"message=modified"}) if assert.NoError(t, err) { assert.Equal(t, "modified", wf.Spec.Arguments.Parameters[0].Value.String()) assert.Equal(t, "modified", wf.Status.StoredWorkflowSpec.Arguments.Parameters[0].Value.String()) @@ -1296,7 +1296,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) + _, _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) assert.Error(t, err) }) @@ -1313,7 +1313,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) + _, _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) assert.Error(t, err) }) @@ -1333,7 +1333,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) + _, _, _, err = FormulateRetryWorkflow(ctx, wf, false, "", nil) assert.Error(t, err) }) @@ -1355,7 +1355,7 @@ func TestFormulateRetryWorkflow(t *testing.T) { } _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) - wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=4", nil) + wf, _, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=4", nil) if assert.NoError(t, err) { // Node #4 is deleted and will be recreated so only 4 nodes left in wf.Status.Nodes if assert.Len(t, wf.Status.Nodes, 4) { @@ -1908,25 +1908,27 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) // Retry top individual pod node - wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step1", nil) + wf, podsToDelete, podsToReset, err := FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step1", nil) assert.NoError(t, err) assert.Equal(t, 1, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) assert.Equal(t, 6, len(podsToDelete)) + assert.Equal(t, 0, len(podsToReset)) // Retry top individual suspend node wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step2", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step2", nil) assert.NoError(t, err) assert.Equal(t, 3, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step1").Phase) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step2").Phase) assert.Equal(t, 5, len(podsToDelete)) + assert.Equal(t, 1, len(podsToReset)) // Retry the starting on first DAG in one of the branches wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2", nil) assert.NoError(t, err) assert.Equal(t, 12, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -1944,10 +1946,11 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1").Phase) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step1").Phase) assert.Equal(t, 3, len(podsToDelete)) + assert.Equal(t, 3, len(podsToReset)) // Retry the starting on second DAG in one of the branches wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1", nil) assert.NoError(t, err) assert.Equal(t, 12, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -1965,10 +1968,11 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1").Phase) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step1").Phase) assert.Equal(t, 3, len(podsToDelete)) + assert.Equal(t, 3, len(podsToReset)) // Retry the first individual node (suspended node) connecting to the second DAG in one of the branches wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step1", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step1", nil) assert.NoError(t, err) assert.Equal(t, 12, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -1986,10 +1990,11 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1").Phase) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step1").Phase) assert.Equal(t, 3, len(podsToDelete)) + assert.Equal(t, 3, len(podsToReset)) // Retry the second individual node (pod node) connecting to the second DAG in one of the branches wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step2", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step2", nil) assert.NoError(t, err) assert.Equal(t, 12, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -2008,10 +2013,11 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { // The suspended node remains succeeded assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step1").Phase) assert.Equal(t, 3, len(podsToDelete)) + assert.Equal(t, 3, len(podsToReset)) // Retry the third individual node (pod node) connecting to the second DAG in one of the branches wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step3", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step3", nil) assert.NoError(t, err) assert.Equal(t, 13, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -2031,10 +2037,11 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step1").Phase) assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step2").Phase) assert.Equal(t, 2, len(podsToDelete)) + assert.Equal(t, 4, len(podsToReset)) // Retry the last individual node (suspend node) connecting to the second DAG in one of the branches wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step2", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step2", nil) assert.NoError(t, err) assert.Equal(t, 15, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -2055,10 +2062,11 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step1.dag3-step2").Phase) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step2").Phase) assert.Equal(t, 1, len(podsToDelete)) + assert.Equal(t, 5, len(podsToReset)) // Retry the node that connects the two branches wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step4", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step4", nil) assert.NoError(t, err) assert.Equal(t, 16, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -2078,10 +2086,11 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step2").Phase) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step4").Phase) assert.Equal(t, 1, len(podsToDelete)) + assert.Equal(t, 5, len(podsToReset)) // Retry the last node (failing node) wf = wfv1.MustUnmarshalWorkflow(retryWorkflowWithNestedDAGsWithSuspendNodes) - wf, podsToDelete, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step5-tofail", nil) + wf, podsToDelete, podsToReset, err = FormulateRetryWorkflow(ctx, wf, true, "name=fail-two-nested-dag-suspend.dag1-step5-tofail", nil) assert.NoError(t, err) assert.Equal(t, 16, len(wf.Status.Nodes)) assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["fail-two-nested-dag-suspend"].Phase) @@ -2101,4 +2110,5 @@ func TestRetryWorkflowWithNestedDAGsWithSuspendNodes(t *testing.T) { assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step3-middle2.dag2-branch2-step2").Phase) assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByName("fail-two-nested-dag-suspend.dag1-step4").Phase) assert.Equal(t, 1, len(podsToDelete)) + assert.Equal(t, 5, len(podsToReset)) }