Skip to content

Commit

Permalink
chore: Support substitute global variable in Spec level elements (#5565)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored Apr 10, 2021
1 parent 88917cb commit cc7e310
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 35 deletions.
7 changes: 2 additions & 5 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,8 @@ func TestCheckAndInitWorkflowTmplRef(t *testing.T) {
wftmpl := unmarshalWFTmpl(wfTmpl)
cancel, controller := newController(wf, wftmpl)
defer cancel()
woc := wfOperationCtx{
controller: controller,
wf: wf,
}
err := woc.setExecWorkflow()
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow(context.Background())
assert.NoError(t, err)
assert.Equal(t, wftmpl.Spec.WorkflowSpec.Templates, woc.execWf.Spec.Templates)
}
Expand Down
74 changes: 50 additions & 24 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {

// Set the Execute workflow spec for execution
// ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault
err := woc.setExecWorkflow()
err := woc.setExecWorkflow(ctx)
if err != nil {
woc.log.WithError(err).Errorf("Unable to get Workflow Template Reference for workflow")
woc.markWorkflowError(ctx, err)
woc.log.WithError(err).Errorf("Unable to set ExecWorkflow")
return
}

Expand Down Expand Up @@ -241,7 +240,6 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.preExecutionNodePhases[node.ID] = node.Phase
}

// Perform one-time workflow validation
if woc.wf.Status.Phase == wfv1.WorkflowUnknown {
woc.markWorkflowRunning(ctx)
err := woc.createPDBResource(ctx)
Expand All @@ -250,23 +248,6 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.markWorkflowFailed(ctx, msg)
return
}
validateOpts := validate.ValidateOpts{ContainerRuntimeExecutor: woc.getContainerRuntimeExecutor()}
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTemplates(woc.wf.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().ClusterWorkflowTemplates())

// Validate the execution wfSpec
wfConditions, err := validate.ValidateWorkflow(wftmplGetter, cwftmplGetter, woc.wf, validateOpts)
if err != nil {
msg := fmt.Sprintf("invalid spec: %s", err.Error())
woc.markWorkflowFailed(ctx, msg)
return
}
woc.setGlobalParameters(woc.execWf.Spec.Arguments)
// If we received conditions during validation (such as SpecWarnings), add them to the Workflow object
if len(*wfConditions) > 0 {
woc.wf.Status.Conditions.JoinConditions(wfConditions)
woc.updated = true
}

woc.workflowDeadline = woc.getWorkflowDeadline()

Expand All @@ -284,7 +265,6 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
woc.wf.Status.EstimatedDuration = woc.estimateWorkflowDuration()
} else {
woc.setGlobalParameters(woc.execWf.Spec.Arguments)
woc.workflowDeadline = woc.getWorkflowDeadline()
err := woc.podReconciliation(ctx)
if err == nil {
Expand Down Expand Up @@ -3197,23 +3177,53 @@ func (woc *wfOperationCtx) retryStrategy(tmpl *wfv1.Template) *wfv1.RetryStrateg
return woc.execWf.Spec.RetryStrategy
}

func (woc *wfOperationCtx) setExecWorkflow() error {
func (woc *wfOperationCtx) setExecWorkflow(ctx context.Context) error {
if woc.wf.Spec.WorkflowTemplateRef != nil {
err := woc.setStoredWfSpec()
if err != nil {
woc.markWorkflowError(ctx, err)
return err
}
woc.execWf = &wfv1.Workflow{Spec: *woc.wf.Status.StoredWorkflowSpec.DeepCopy()}
woc.volumes = woc.execWf.Spec.DeepCopy().Volumes
} else if woc.controller.Config.WorkflowRestrictions.MustUseReference() {
return fmt.Errorf("workflows must use workflowTemplateRef to be executed when the controller is in reference mode")
err := fmt.Errorf("workflows must use workflowTemplateRef to be executed when the controller is in reference mode")
woc.markWorkflowError(ctx, err)
return err
} else {
err := woc.controller.setWorkflowDefaults(woc.wf)
if err != nil {
woc.markWorkflowError(ctx, err)
return err
}
woc.volumes = woc.wf.Spec.DeepCopy().Volumes
}

// Perform one-time workflow validation
if woc.wf.Status.Phase == wfv1.WorkflowUnknown {
validateOpts := validate.ValidateOpts{ContainerRuntimeExecutor: woc.getContainerRuntimeExecutor()}
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTemplates(woc.wf.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().ClusterWorkflowTemplates())

// Validate the execution wfSpec
wfConditions, err := validate.ValidateWorkflow(wftmplGetter, cwftmplGetter, woc.wf, validateOpts)
if err != nil {
msg := fmt.Sprintf("invalid spec: %s", err.Error())
woc.markWorkflowFailed(ctx, msg)
return err
}

// If we received conditions during validation (such as SpecWarnings), add them to the Workflow object
if len(*wfConditions) > 0 {
woc.wf.Status.Conditions.JoinConditions(wfConditions)
woc.updated = true
}
}
woc.setGlobalParameters(woc.execWf.Spec.Arguments)
err := woc.substituteGlobalVariables()
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -3300,3 +3310,19 @@ func (woc *wfOperationCtx) mergedTemplateDefaultsInto(originalTmpl *wfv1.Templat
}
return nil
}

func (woc *wfOperationCtx) substituteGlobalVariables() error {
wfSpec, err := json.Marshal(woc.execWf.Spec)
if err != nil {
return err
}
resolveSpec, err := template.Replace(string(wfSpec), woc.globalParams, true)
if err != nil {
return err
}
err = json.Unmarshal([]byte(resolveSpec), &woc.execWf.Spec)
if err != nil {
return err
}
return nil
}
42 changes: 40 additions & 2 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3146,7 +3146,6 @@ metadata:
spec:
entrypoint: 123
`: {
"Normal WorkflowRunning Workflow Running",
"Warning WorkflowFailed invalid spec: template name '123' undefined",
},
// DAG
Expand Down Expand Up @@ -4734,7 +4733,7 @@ func TestPropagateMaxDurationProcess(t *testing.T) {
assert.NotNil(t, wf)
woc := newWorkflowOperationCtx(wf, controller)
assert.NotNil(t, woc)
err := woc.setExecWorkflow()
err := woc.setExecWorkflow(context.Background())
assert.NoError(t, err)
assert.Zero(t, len(woc.wf.Status.Nodes))

Expand Down Expand Up @@ -6598,6 +6597,45 @@ func TestOnExitDAGStatusCompatibility(t *testing.T) {
assert.Nil(t, nodeB)
}

const testGlobalParamSubstitute = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-diamond-8xw8l
spec:
entrypoint: "whalesay1"
arguments:
parameters:
- name: entrypoint
value: test
- name: mutex
value: mutex1
- name: message
value: mutex1
synchronization:
mutex:
name: "{{workflow.parameters.mutex}}"
templates:
- name: whalesay1
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{workflow.parameters.message}}"]
`

func TestSubstituteGlobalVariables(t *testing.T) {
wf := unmarshalWF(testGlobalParamSubstitute)
cancel, controller := newController(wf)
defer cancel()

// ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow(context.Background())
assert.NoError(t, err)
assert.NotNil(t, woc.execWf)
assert.Equal(t, "mutex1", woc.execWf.Spec.Synchronization.Mutex.Name)
}

var wfPending = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
9 changes: 5 additions & 4 deletions workflow/controller/operator_tmpl_default_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestSetTemplateDefault(t *testing.T) {
t.Run("tmplDefaultInConfig", func(t *testing.T) {
wf := unmarshalWF(defaultWf)
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow()
err := woc.setExecWorkflow(context.Background())
assert.NoError(t, err)
tmpl := woc.execWf.Spec.Templates[0]
err = woc.mergedTemplateDefaultsInto(&tmpl)
Expand All @@ -153,7 +154,7 @@ func TestSetTemplateDefault(t *testing.T) {
},
}
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow()
err := woc.setExecWorkflow(context.Background())
assert.NoError(t, err)
tmpl := woc.execWf.Spec.Templates[0]
err = woc.mergedTemplateDefaultsInto(&tmpl)
Expand Down Expand Up @@ -183,7 +184,7 @@ func TestSetTemplateDefault(t *testing.T) {
},
}
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow()
err := woc.setExecWorkflow(context.Background())
assert.NoError(t, err)
tmpl := woc.execWf.Spec.Templates[0]
err = woc.mergedTemplateDefaultsInto(&tmpl)
Expand Down Expand Up @@ -224,7 +225,7 @@ func TestSetTemplateDefault(t *testing.T) {
},
}
woc := newWorkflowOperationCtx(wf, controller)
err := woc.setExecWorkflow()
err := woc.setExecWorkflow(context.Background())
assert.NoError(t, err)
tmpl := woc.execWf.Spec.Templates[0]
err = woc.mergedTemplateDefaultsInto(&tmpl)
Expand Down

0 comments on commit cc7e310

Please sign in to comment.