Skip to content

Commit

Permalink
fix(controller): Not updating StoredWorkflowSpec when WFT changed dur…
Browse files Browse the repository at this point in the history
…ing workflow running (#6342)

Signed-off-by: Saravanan Balasubramanian <sarabala1979@gmail.com>
  • Loading branch information
sarabala1979 committed Jul 15, 2021
1 parent d147601 commit 6041ffe
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 5 deletions.
14 changes: 9 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3275,21 +3275,25 @@ func (woc *wfOperationCtx) setStoredWfSpec() error {
wfDefault = &wfv1.Workflow{}
}

if woc.needsStoredWfSpecUpdate() {
workflowTemplateSpec := woc.wf.Status.StoredWorkflowSpec

// Load the spec from WorkflowTemplate in first time.
if woc.wf.Status.StoredWorkflowSpec == nil {
wftHolder, err := woc.fetchWorkflowSpec()
if err != nil {
return err
}

// Join WFT and WfDefault metadata to Workflow metadata.
wfutil.JoinWorkflowMetaData(&woc.wf.ObjectMeta, wftHolder.GetWorkflowMetadata(), &wfDefault.ObjectMeta)

workflowTemplateSpec = wftHolder.GetWorkflowSpec()
}
// Update the Entrypoint, ShutdownStrategy and Suspend
if woc.needsStoredWfSpecUpdate() {
// Join workflow, workflow template, and workflow default metadata to workflow spec.
mergedWf, err := wfutil.JoinWorkflowSpec(&woc.wf.Spec, wftHolder.GetWorkflowSpec(), &wfDefault.Spec)
mergedWf, err := wfutil.JoinWorkflowSpec(&woc.wf.Spec, workflowTemplateSpec, &wfDefault.Spec)
if err != nil {
return err
}

woc.wf.Status.StoredWorkflowSpec = &mergedWf.Spec
woc.updated = true
} else if woc.controller.Config.WorkflowRestrictions.MustNotChangeSpec() {
Expand Down
53 changes: 53 additions & 0 deletions workflow/controller/operator_workflow_template_ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,56 @@ func TestSuspendResumeWorkflowTemplateRef(t *testing.T) {
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend)
}

const wfTmplUpt = `
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-whalesay-template
namespace: default
spec:
templates:
- name: hello-hello-hello
steps:
- - name: hello1
template: whalesay
arguments:
parameters: [{name: message, value: "hello1"}]
- - name: hello2a
template: whalesay
arguments:
parameters: [{name: message, value: "hello2a"}]
- name: hello2b
template: whalesay
arguments:
parameters: [{name: message, value: "hello2b"}]
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`

func TestWorkflowTemplateUpdateScenario(t *testing.T) {

wf := wfv1.MustUnmarshalWorkflow(wfWithTmplRef)
cancel, controller := newController(wf, wfv1.MustUnmarshalWorkflowTemplate(wfTmpl))
defer cancel()
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.NotEmpty(t, woc.wf.Status.StoredWorkflowSpec)
assert.NotEmpty(t, woc.wf.Status.StoredWorkflowSpec.Templates[0].Container)

cancel, controller = newController(woc.wf, wfv1.MustUnmarshalWorkflowTemplate(wfTmplUpt))
defer cancel()
ctx = context.Background()
woc1 := newWorkflowOperationCtx(woc.wf, controller)
woc1.operate(ctx)
assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec)
assert.Equal(t, woc.wf.Status.StoredWorkflowSpec, woc1.wf.Status.StoredWorkflowSpec)
}
6 changes: 6 additions & 0 deletions workflow/util/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func JoinWorkflowSpec(wfSpec, wftSpec, wfDefaultSpec *wfv1.WorkflowSpec) (*wfv1.
return nil, err
}
}

// This condition will update the workflow Spec suspend value if merged value is different.
// This scenario will happen when Workflow with WorkflowTemplateRef has suspend template
if wfSpec.Suspend != targetWf.Spec.Suspend {
targetWf.Spec.Suspend = wfSpec.Suspend
}
return &targetWf, nil
}

Expand Down

0 comments on commit 6041ffe

Please sign in to comment.