diff --git a/api/jsonschema/schema.json b/api/jsonschema/schema.json index fe6f65c1c6a4..a04feec70437 100644 --- a/api/jsonschema/schema.json +++ b/api/jsonschema/schema.json @@ -7648,7 +7648,7 @@ "additionalProperties": { "type": "boolean" }, - "description": "TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.", + "description": "TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.", "type": "object" } }, diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index a0e7e27825ee..7bfaf5e332fb 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -11568,7 +11568,7 @@ "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.SynchronizationStatus" }, "taskResultsCompletionStatus": { - "description": "TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.", + "description": "TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.", "type": "object", "additionalProperties": { "type": "boolean" diff --git a/docs/fields.md b/docs/fields.md index 04989d2410a8..e59816b9854c 100644 --- a/docs/fields.md +++ b/docs/fields.md @@ -857,7 +857,7 @@ WorkflowStatus contains overall status information about a workflow |`storedTemplates`|[`Template`](#template)|StoredTemplates is a mapping between a template ref and the node's status.| |`storedWorkflowTemplateSpec`|[`WorkflowSpec`](#workflowspec)|StoredWorkflowSpec stores the WorkflowTemplate spec for future execution.| |`synchronization`|[`SynchronizationStatus`](#synchronizationstatus)|Synchronization stores the status of synchronization locks| -|`taskResultsCompletionStatus`|`Map< boolean , string >`|TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.| +|`taskResultsCompletionStatus`|`Map< boolean , string >`|TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.| ## CronWorkflowSpec diff --git a/pkg/apis/workflow/v1alpha1/generated.proto b/pkg/apis/workflow/v1alpha1/generated.proto index b22e9b37ca5c..03f7bb7507e5 100644 --- a/pkg/apis/workflow/v1alpha1/generated.proto +++ b/pkg/apis/workflow/v1alpha1/generated.proto @@ -2137,7 +2137,7 @@ message WorkflowStatus { // ArtifactGCStatus maintains the status of Artifact Garbage Collection optional ArtGCStatus artifactGCStatus = 19; - // TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection. + // TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection. map taskResultsCompletionStatus = 20; } diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index ac4784c50ce7..e98581e88e83 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -7895,7 +7895,7 @@ func schema_pkg_apis_workflow_v1alpha1_WorkflowStatus(ref common.ReferenceCallba }, "taskResultsCompletionStatus": { SchemaProps: spec.SchemaProps{ - Description: "TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.", + Description: "TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.", Type: []string{"object"}, AdditionalProperties: &spec.SchemaOrBool{ Allows: true, diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index 7b36fd0b6e99..8cc3170f4796 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -1944,7 +1944,7 @@ type WorkflowStatus struct { // ArtifactGCStatus maintains the status of Artifact Garbage Collection ArtifactGCStatus *ArtGCStatus `json:"artifactGCStatus,omitempty" protobuf:"bytes,19,opt,name=artifactGCStatus"` - // TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection. + // TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection. TaskResultsCompletionStatus map[string]bool `json:"taskResultsCompletionStatus,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompletionStatus"` } @@ -1971,6 +1971,14 @@ func (ws *WorkflowStatus) TaskResultsInProgress() bool { return false } +func (ws *WorkflowStatus) IsTaskResultIncomplete(name string) bool { + value, found := ws.TaskResultsCompletionStatus[name] + if found { + return !value + } + return true +} + func (ws *WorkflowStatus) IsOffloadNodeStatus() bool { return ws.OffloadNodeStatusVersion != "" } diff --git a/sdks/java/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md b/sdks/java/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md index c008ea6ef78c..8fa76f9bbbc7 100644 --- a/sdks/java/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md +++ b/sdks/java/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md @@ -26,7 +26,7 @@ Name | Type | Description | Notes **storedTemplates** | [**Map<String, IoArgoprojWorkflowV1alpha1Template>**](IoArgoprojWorkflowV1alpha1Template.md) | StoredTemplates is a mapping between a template ref and the node's status. | [optional] **storedWorkflowTemplateSpec** | [**IoArgoprojWorkflowV1alpha1WorkflowSpec**](IoArgoprojWorkflowV1alpha1WorkflowSpec.md) | | [optional] **synchronization** | [**IoArgoprojWorkflowV1alpha1SynchronizationStatus**](IoArgoprojWorkflowV1alpha1SynchronizationStatus.md) | | [optional] -**taskResultsCompletionStatus** | **Map<String, Boolean>** | TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection. | [optional] +**taskResultsCompletionStatus** | **Map<String, Boolean>** | TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection. | [optional] diff --git a/sdks/python/client/argo_workflows/model/io_argoproj_workflow_v1alpha1_workflow_status.py b/sdks/python/client/argo_workflows/model/io_argoproj_workflow_v1alpha1_workflow_status.py index 45224a451dfb..86e17c538bbd 100644 --- a/sdks/python/client/argo_workflows/model/io_argoproj_workflow_v1alpha1_workflow_status.py +++ b/sdks/python/client/argo_workflows/model/io_argoproj_workflow_v1alpha1_workflow_status.py @@ -210,7 +210,7 @@ def _from_openapi_data(cls, *args, **kwargs): # noqa: E501 stored_templates ({str: (IoArgoprojWorkflowV1alpha1Template,)}): StoredTemplates is a mapping between a template ref and the node's status.. [optional] # noqa: E501 stored_workflow_template_spec (IoArgoprojWorkflowV1alpha1WorkflowSpec): [optional] # noqa: E501 synchronization (IoArgoprojWorkflowV1alpha1SynchronizationStatus): [optional] # noqa: E501 - task_results_completion_status ({str: (bool,)}): TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.. [optional] # noqa: E501 + task_results_completion_status ({str: (bool,)}): TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.. [optional] # noqa: E501 """ _check_type = kwargs.pop('_check_type', True) @@ -310,7 +310,7 @@ def __init__(self, *args, **kwargs): # noqa: E501 stored_templates ({str: (IoArgoprojWorkflowV1alpha1Template,)}): StoredTemplates is a mapping between a template ref and the node's status.. [optional] # noqa: E501 stored_workflow_template_spec (IoArgoprojWorkflowV1alpha1WorkflowSpec): [optional] # noqa: E501 synchronization (IoArgoprojWorkflowV1alpha1SynchronizationStatus): [optional] # noqa: E501 - task_results_completion_status ({str: (bool,)}): TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection.. [optional] # noqa: E501 + task_results_completion_status ({str: (bool,)}): TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection.. [optional] # noqa: E501 """ _check_type = kwargs.pop('_check_type', True) diff --git a/sdks/python/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md b/sdks/python/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md index 06b3715f3745..c46412b8d3c8 100644 --- a/sdks/python/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md +++ b/sdks/python/client/docs/IoArgoprojWorkflowV1alpha1WorkflowStatus.md @@ -23,7 +23,7 @@ Name | Type | Description | Notes **stored_templates** | [**{str: (IoArgoprojWorkflowV1alpha1Template,)}**](IoArgoprojWorkflowV1alpha1Template.md) | StoredTemplates is a mapping between a template ref and the node's status. | [optional] **stored_workflow_template_spec** | [**IoArgoprojWorkflowV1alpha1WorkflowSpec**](IoArgoprojWorkflowV1alpha1WorkflowSpec.md) | | [optional] **synchronization** | [**IoArgoprojWorkflowV1alpha1SynchronizationStatus**](IoArgoprojWorkflowV1alpha1SynchronizationStatus.md) | | [optional] -**task_results_completion_status** | **{str: (bool,)}** | TaskResultsCompletionStatus tracks task result completion status (mapped by pod name). Used to prevent premature archiving and garbage collection. | [optional] +**task_results_completion_status** | **{str: (bool,)}** | TaskResultsCompletionStatus tracks task result completion status (mapped by node ID). Used to prevent premature archiving and garbage collection. | [optional] **any string name** | **bool, date, datetime, dict, float, int, list, str, none_type** | any string name can be used but the value must be the correct type | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 315a79b12333..16bf6c0fa88a 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -537,6 +537,10 @@ func makePodsPhase(ctx context.Context, woc *wfOperationCtx, phase apiv1.PodPhas if err != nil { panic(err) } + if phase == apiv1.PodSucceeded { + nodeID := woc.nodeID(&pod) + woc.wf.Status.MarkTaskResultComplete(nodeID) + } } } } diff --git a/workflow/controller/exit_handler_test.go b/workflow/controller/exit_handler_test.go index 32bd3dbe8f6c..daed6956a6b2 100644 --- a/workflow/controller/exit_handler_test.go +++ b/workflow/controller/exit_handler_test.go @@ -209,6 +209,7 @@ func TestStepsOnExitTmplWithArt(t *testing.T) { }, } woc.wf.Status.Nodes[idx] = node + woc.wf.Status.MarkTaskResultComplete(node.ID) } } woc1 := newWorkflowOperationCtx(woc.wf, controller) @@ -283,6 +284,7 @@ func TestDAGOnExitTmplWithArt(t *testing.T) { }, } woc.wf.Status.Nodes[idx] = node + woc.wf.Status.MarkTaskResultComplete(node.ID) } } woc1 := newWorkflowOperationCtx(woc.wf, controller) @@ -383,6 +385,7 @@ func TestStepsTmplOnExit(t *testing.T) { }, } woc2.wf.Status.Nodes[idx] = node + woc.wf.Status.MarkTaskResultComplete(node.ID) } } @@ -487,6 +490,7 @@ func TestDAGOnExit(t *testing.T) { }, } woc2.wf.Status.Nodes[idx] = node + woc.wf.Status.MarkTaskResultComplete(node.ID) } } woc3 := newWorkflowOperationCtx(woc2.wf, controller) diff --git a/workflow/controller/hooks_test.go b/workflow/controller/hooks_test.go index 7a4ab353cb81..77b49230ea2b 100644 --- a/workflow/controller/hooks_test.go +++ b/workflow/controller/hooks_test.go @@ -997,7 +997,7 @@ spec: assert.Equal(t, wfv1.NodePending, node.Phase) makePodsPhase(ctx, woc, apiv1.PodFailed) woc = newWorkflowOperationCtx(woc.wf, controller) - err := woc.podReconciliation(ctx) + err, _ := woc.podReconciliation(ctx) assert.NoError(t, err) node = woc.wf.Status.Nodes.FindByDisplayName("hook-failures.hooks.failure") assert.NotNil(t, node) @@ -1140,6 +1140,7 @@ spec: pod, _ := podcs.Get(ctx, "hook-running", metav1.GetOptions{}) pod.Status.Phase = apiv1.PodSucceeded updatedPod, _ := podcs.Update(ctx, pod, metav1.UpdateOptions{}) + woc.wf.Status.MarkTaskResultComplete(woc.nodeID(pod)) _ = woc.controller.podInformer.GetStore().Update(updatedPod) woc = newWorkflowOperationCtx(woc.wf, controller) woc.operate(ctx) @@ -1231,6 +1232,7 @@ spec: pod.Status.Phase = apiv1.PodSucceeded updatedPod, _ := podcs.Update(ctx, &pod, metav1.UpdateOptions{}) _ = woc.controller.podInformer.GetStore().Update(updatedPod) + woc.wf.Status.MarkTaskResultComplete(woc.nodeID(&pod)) woc = newWorkflowOperationCtx(woc.wf, controller) woc.operate(ctx) assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 69996826c060..6b93dcf0346b 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -307,7 +307,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { woc.wf.Status.EstimatedDuration = woc.estimateWorkflowDuration() } else { woc.workflowDeadline = woc.getWorkflowDeadline() - err = woc.podReconciliation(ctx) + err, podReconciliationCompleted := woc.podReconciliation(ctx) if err == nil { woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() } @@ -318,6 +318,12 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { // TODO: we need to re-add to the workqueue, but should happen in caller return } + + if !podReconciliationCompleted { + woc.log.WithField("workflow", woc.wf.ObjectMeta.Name).Info("pod reconciliation didn't complete, will retry") + woc.requeue() + return + } } if woc.ShouldSuspend() { @@ -1090,15 +1096,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate // pods and update the node state before continuing the evaluation of the workflow. // Records all pods which were observed completed, which will be labeled completed=true // after successful persist of the workflow. -func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { +// returns whether pod reconciliation successfully completed +func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool) { podList, err := woc.getAllWorkflowPods() if err != nil { - return err + return err, false } seenPods := make(map[string]*apiv1.Pod) seenPodLock := &sync.Mutex{} wfNodesLock := &sync.RWMutex{} podRunningCondition := wfv1.Condition{Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse} + taskResultIncomplete := false performAssessment := func(pod *apiv1.Pod) { if pod == nil { return @@ -1117,6 +1125,12 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { node, err := woc.wf.Status.Nodes.Get(nodeID) if err == nil { if newState := woc.assessNodeStatus(pod, node); newState != nil { + // Check whether its taskresult is in an incompleted state. + if newState.Succeeded() && woc.wf.Status.IsTaskResultIncomplete(node.ID) { + woc.log.WithFields(log.Fields{"nodeID": newState.ID}).Debug("Taskresult of the node not yet completed") + taskResultIncomplete = true + return + } woc.addOutputsToGlobalScope(newState.Outputs) if newState.MemoizationStatus != nil { if newState.Succeeded() { @@ -1160,6 +1174,12 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { wg.Wait() + // If true, it means there are some nodes which have outputs we wanted to be marked succeed, but the node's taskresults didn't completed. + // We should make sure the taskresults processing is complete as it will be possible to reference it in the next step. + if taskResultIncomplete { + return nil, false + } + woc.wf.Status.Conditions.UpsertCondition(podRunningCondition) // Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in @@ -1201,7 +1221,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { woc.markNodePhase(node.Name, wfv1.NodeError, "pod deleted") } } - return nil + return nil, !taskResultIncomplete } func (woc *wfOperationCtx) nodeID(pod *apiv1.Pod) string { @@ -1367,7 +1387,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus if x, ok := pod.Annotations[common.AnnotationKeyReportOutputsCompleted]; ok { woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/") - resultName := pod.GetName() + resultName := woc.nodeID(pod) if x == "true" { woc.wf.Status.MarkTaskResultComplete(resultName) } else { diff --git a/workflow/controller/operator_concurrency_test.go b/workflow/controller/operator_concurrency_test.go index bd9b6393455e..6e0d5c6486cf 100644 --- a/workflow/controller/operator_concurrency_test.go +++ b/workflow/controller/operator_concurrency_test.go @@ -196,7 +196,7 @@ func TestSemaphoreTmplLevel(t *testing.T) { woc_two.operate(ctx) // Check Node status - err = woc_two.podReconciliation(ctx) + err, _ = woc_two.podReconciliation(ctx) assert.NoError(t, err) for _, node := range woc_two.wf.Status.Nodes { assert.Equal(t, wfv1.NodePending, node.Phase) @@ -257,7 +257,7 @@ func TestSemaphoreScriptTmplLevel(t *testing.T) { woc_two.operate(ctx) // Check Node status - err = woc_two.podReconciliation(ctx) + err, _ = woc_two.podReconciliation(ctx) assert.NoError(t, err) for _, node := range woc_two.wf.Status.Nodes { assert.Equal(t, wfv1.NodePending, node.Phase) @@ -319,7 +319,7 @@ func TestSemaphoreScriptConfigMapInDifferentNamespace(t *testing.T) { woc_two.operate(ctx) // Check Node status - err = woc_two.podReconciliation(ctx) + err, _ = woc_two.podReconciliation(ctx) assert.NoError(t, err) for _, node := range woc_two.wf.Status.Nodes { assert.Equal(t, wfv1.NodePending, node.Phase) @@ -379,7 +379,7 @@ func TestSemaphoreResourceTmplLevel(t *testing.T) { woc_two.operate(ctx) // Check Node status - err = woc_two.podReconciliation(ctx) + err, _ = woc_two.podReconciliation(ctx) assert.NoError(t, err) for _, node := range woc_two.wf.Status.Nodes { assert.Equal(t, wfv1.NodePending, node.Phase) @@ -416,7 +416,7 @@ func TestSemaphoreWithOutConfigMap(t *testing.T) { wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Create(ctx, wf, metav1.CreateOptions{}) assert.NoError(t, err) woc := newWorkflowOperationCtx(wf, controller) - err = woc.podReconciliation(ctx) + err, _ = woc.podReconciliation(ctx) assert.NoError(t, err) for _, node := range woc.wf.Status.Nodes { assert.Equal(t, wfv1.NodePending, node.Phase) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 350e34fb68f0..42199282570f 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1689,6 +1689,8 @@ func TestWorkflowStepRetry(t *testing.T) { wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{}) assert.Nil(t, err) woc = newWorkflowOperationCtx(wf, controller) + nodeID := woc.nodeID(&pods.Items[0]) + woc.wf.Status.MarkTaskResultComplete(nodeID) woc.operate(ctx) // fail the second pod @@ -10082,3 +10084,117 @@ status: woc.operate(ctx) } + +var needReconcileWorklfow = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: steps-need-reconcile +spec: + entrypoint: hello-hello-hello + arguments: + parameters: + - name: message1 + value: hello world + - name: message2 + value: foobar + # This spec contains two templates: hello-hello-hello and whalesay + templates: + - name: hello-hello-hello + # Instead of just running a container + # This template has a sequence of steps + steps: + - - name: hello1 # hello1 is run before the following steps + continueOn: {} + template: whalesay + arguments: + parameters: + - name: message + value: "hello1" + - name: workflow_artifact_key + value: "{{ workflow.parameters.message2}}" + - - name: hello2a # double dash => run after previous step + template: whalesay + arguments: + parameters: + - name: message + value: "{{=steps['hello1'].outputs.parameters['workflow_artifact_key']}}" + + # This is the same template as from the previous example + - name: whalesay + inputs: + parameters: + - name: message + outputs: + parameters: + - name: workflow_artifact_key + value: '{{workflow.name}}' + script: + image: python:alpine3.6 + command: [python] + env: + - name: message + value: "{{inputs.parameters.message}}" + source: | + import random + i = random.randint(1, 100) + print(i)` + +// TestWorkflowNeedReconcile test whether a workflow need reconcile taskresults. +func TestWorkflowNeedReconcile(t *testing.T) { + cancel, controller := newController() + defer cancel() + ctx := context.Background() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + wf := wfv1.MustUnmarshalWorkflow(needReconcileWorklfow) + wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{}) + assert.Nil(t, err) + wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + pods, err := listPods(woc) + assert.Nil(t, err) + assert.Equal(t, 1, len(pods.Items)) + + // complete the first pod + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + woc = newWorkflowOperationCtx(wf, controller) + err, podReconciliationCompleted := woc.podReconciliation(ctx) + assert.Nil(t, err) + assert.False(t, podReconciliationCompleted) + + for idx, node := range woc.wf.Status.Nodes { + if strings.Contains(node.Name, ".hello1") { + node.Outputs = &wfv1.Outputs{ + Parameters: []wfv1.Parameter{ + { + Name: "workflow_artifact_key", + Value: wfv1.AnyStringPtr("steps-need-reconcile"), + }, + }, + } + woc.wf.Status.Nodes[idx] = node + woc.wf.Status.MarkTaskResultComplete(node.ID) + } + } + err, podReconciliationCompleted = woc.podReconciliation(ctx) + assert.Nil(t, err) + assert.True(t, podReconciliationCompleted) + woc.operate(ctx) + + // complete the second pod + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + woc = newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + pods, err = listPods(woc) + assert.Nil(t, err) + if assert.Equal(t, 2, len(pods.Items)) { + assert.Equal(t, "hello1", pods.Items[0].Spec.Containers[1].Env[0].Value) + assert.Equal(t, "steps-need-reconcile", pods.Items[1].Spec.Containers[1].Env[0].Value) + } +}