Skip to content

Commit

Permalink
Resolve WorkflowTemplate lazily (#1655)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtaniwaki authored and sarabala1979 committed Oct 8, 2019
1 parent d15994b commit b5dcac8
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 16 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (woc *wfOperationCtx) getResolvedTemplate(node *wfv1.NodeStatus, tmpl wfv1.
woc.log.Debugf("Found a resolved template for node %s", node.Name)
if node.WorkflowTemplateName != "" {
woc.log.Debugf("Switch the template context to %s", node.WorkflowTemplateName)
newTmplCtx, err := tmplCtx.OnWorkflowTemplate(node.WorkflowTemplateName)
newTmplCtx, err := tmplCtx.WithLazyWorkflowTemplate(woc.wf.Namespace, node.WorkflowTemplateName)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
// We add the aggregate outputs of our children to the scope as a JSON list
var childNodes []wfv1.NodeStatus
for _, node := range woc.wf.Status.Nodes {
if node.BoundaryID == stepsCtx.boundaryID && strings.HasPrefix(node.Name, childNodeName+"(") {
if node.BoundaryID == stepsCtx.boundaryID && strings.HasPrefix(node.Name, childNodeName+"(") && node.Type != wfv1.NodeTypeSkipped {
childNodes = append(childNodes, node)
}
}
Expand Down
10 changes: 3 additions & 7 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,7 @@ func (ctx *Context) WithTemplateBase(tmplBase wfv1.TemplateGetter) *Context {
return NewContext(ctx.wftmplGetter, tmplBase)
}

// OnWorkflowTemplate creates new context with the wfv1.WorkflowTemplate of the given name.
func (ctx *Context) OnWorkflowTemplate(name string) (*Context, error) {
wftmpl, err := ctx.wftmplGetter.Get(name)
if err != nil {
return nil, err
}
return NewContext(ctx.wftmplGetter, wftmpl), nil
// WithLazyWorkflowTemplate creates new context with the wfv1.WorkflowTemplate of the given name with lazy loading.
func (ctx *Context) WithLazyWorkflowTemplate(namespace, name string) (*Context, error) {
return NewContext(ctx.wftmplGetter, NewLazyWorkflowTemplate(ctx.wftmplGetter, namespace, name)), nil
}
11 changes: 4 additions & 7 deletions workflow/templateresolution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
fakewfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/yaml"
"github.com/stretchr/testify/assert"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)

func createWorkflowTemplate(wfClientset wfclientset.Interface, yamlStr string) error {
Expand Down Expand Up @@ -411,13 +411,10 @@ func TestOnWorkflowTemplate(t *testing.T) {
}

// Get the template base of existing template name.
newCtx, err := ctx.OnWorkflowTemplate("another-workflow-template")
newCtx, err := ctx.WithLazyWorkflowTemplate("namespace", "another-workflow-template")
if err != nil {
t.Fatal(err)
}
wftmpl, ok := newCtx.tmplBase.(*wfv1.WorkflowTemplate)
if !assert.True(t, ok) {
t.Fatal("tmplBase is not a WorkflowTemplate")
}
assert.Equal(t, "another-workflow-template", wftmpl.Name)
tmpl := newCtx.tmplBase.GetTemplateByName("whalesay")
assert.NotNil(t, tmpl)
}
66 changes: 66 additions & 0 deletions workflow/templateresolution/lazyworkflowtemplate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package templateresolution

import (
"github.com/argoproj/argo/pkg/apis/workflow"
v1alpha1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// lazyWorkflowTemplate retrieves WorkflowTemplate lazily.
type lazyWorkflowTemplate struct {
// wftmplGetter is a proxied WorkflowTemplate getter.
wftmplGetter WorkflowTemplateNamespacedGetter
// wftmpl is a cache of retrieved WorkflowTemplate.
wftmpl *wfv1.WorkflowTemplate
// namespace is the namespace of the WorkflowTemplate.
namespace string
// name is the name of the WorkflowTemplate.
name string
}

var _ wfv1.TemplateGetter = &lazyWorkflowTemplate{}

// NewLazyWorkflowTemplate is a public constructor of lazyWorkflowTemplate.
func NewLazyWorkflowTemplate(wftmplGetter WorkflowTemplateNamespacedGetter, namespace, name string) *lazyWorkflowTemplate {
return &lazyWorkflowTemplate{
wftmplGetter: wftmplGetter,
namespace: namespace,
name: name,
}
}

// GetNamespace returns the namespace of the WorkflowTemplate.
func (lwt *lazyWorkflowTemplate) GetNamespace() string {
return lwt.namespace
}

// GetName returns the name of the WorkflowTemplate.
func (lwt *lazyWorkflowTemplate) GetName() string {
return lwt.name
}

// GroupVersionKind returns a GroupVersionKind of WorkflowTemplate.
func (lwt *lazyWorkflowTemplate) GroupVersionKind() schema.GroupVersionKind {
return v1alpha1.SchemeGroupVersion.WithKind(workflow.WorkflowTemplateKind)
}

// GetTemplateByName retrieves a defined template by its name
func (lwt *lazyWorkflowTemplate) GetTemplateByName(name string) *wfv1.Template {
err := lwt.ensureWorkflowTemplate()
if err != nil {
return nil
}
return lwt.wftmpl.GetTemplateByName(name)
}

func (lwt *lazyWorkflowTemplate) ensureWorkflowTemplate() error {
if lwt.wftmpl == nil {
wftmpl, err := lwt.wftmplGetter.Get(lwt.name)
if err != nil {
return err
}
lwt.wftmpl = wftmpl
}
return nil
}

0 comments on commit b5dcac8

Please sign in to comment.