From 0246d184add04e44f77ffbe00e796b3adaf535d2 Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Fri, 9 Aug 2019 04:44:59 +0900 Subject: [PATCH] Use cache to retrieve WorkflowTemplates (#1534) --- workflow/controller/controller.go | 32 +++++++++------ workflow/controller/controller_test.go | 19 +++++++-- workflow/controller/operator.go | 2 +- workflow/templateresolution/context.go | 45 +++++++++++++++------ workflow/templateresolution/context_test.go | 10 ++--- workflow/validate/validate.go | 5 ++- 6 files changed, 78 insertions(+), 35 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 2f4fca6195db..d37627329452 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -7,6 +7,8 @@ import ( "time" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions" + wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1" log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" @@ -54,20 +56,22 @@ type WorkflowController struct { wfclientset wfclientset.Interface // datastructures to support the processing of workflows and workflow pods - wfInformer cache.SharedIndexInformer - podInformer cache.SharedIndexInformer - wfQueue workqueue.RateLimitingInterface - podQueue workqueue.RateLimitingInterface - completedPods chan string - gcPods chan string // pods to be deleted depend on GC strategy - throttler Throttler - wfDBctx sqldb.DBRepository + wfInformer cache.SharedIndexInformer + wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer + podInformer cache.SharedIndexInformer + wfQueue workqueue.RateLimitingInterface + podQueue workqueue.RateLimitingInterface + completedPods chan string + gcPods chan string // pods to be deleted depend on GC strategy + throttler Throttler + wfDBctx sqldb.DBRepository } const ( - workflowResyncPeriod = 20 * time.Minute - workflowMetricsResyncPeriod = 1 * time.Minute - podResyncPeriod = 30 * time.Minute + workflowResyncPeriod = 20 * time.Minute + workflowTemplateResyncPeriod = 20 * time.Minute + workflowMetricsResyncPeriod = 1 * time.Minute + podResyncPeriod = 30 * time.Minute ) // NewWorkflowController instantiates a new WorkflowController @@ -145,16 +149,20 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in wfc.wfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.Config.Namespace, workflowResyncPeriod, wfc.tweakWorkflowlist) + informerFactory := wfextv.NewSharedInformerFactory(wfc.wfclientset, workflowTemplateResyncPeriod) + wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() + wfc.addWorkflowInformerHandler() wfc.podInformer = wfc.newPodInformer() go wfc.wfInformer.Run(ctx.Done()) + go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) go wfc.podLabeler(ctx.Done()) go wfc.podGarbageCollector(ctx.Done()) // Wait for all involved caches to be synced, before processing items from the queue is started - for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.podInformer} { + for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.wftmplInformer.Informer(), wfc.podInformer} { if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { log.Error("Timed out waiting for caches to sync") return diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 6a1fa220af17..3e18f0b6d6c5 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -2,10 +2,12 @@ package controller import ( "bytes" + "context" "encoding/json" "io" "io/ioutil" "testing" + "time" "github.com/ghodss/yaml" "github.com/stretchr/testify/assert" @@ -13,9 +15,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" fakewfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" + wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions" "github.com/argoproj/argo/workflow/config" ) @@ -42,13 +46,22 @@ spec: ` func newController() *WorkflowController { + wfclientset := fakewfclientset.NewSimpleClientset() + informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute) + wftmplInformer := informerFactory.Argoproj().V1alpha1().WorkflowTemplates() + ctx := context.Background() + go wftmplInformer.Informer().Run(ctx.Done()) + if !cache.WaitForCacheSync(ctx.Done(), wftmplInformer.Informer().HasSynced) { + panic("Timed out waiting for caches to sync") + } return &WorkflowController{ Config: config.WorkflowControllerConfig{ ExecutorImage: "executor:latest", }, - kubeclientset: fake.NewSimpleClientset(), - wfclientset: fakewfclientset.NewSimpleClientset(), - completedPods: make(chan string, 512), + kubeclientset: fake.NewSimpleClientset(), + wfclientset: wfclientset, + completedPods: make(chan string, 512), + wftmplInformer: wftmplInformer, } } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 2e51d5225904..044e339b668e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -114,7 +114,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper completedPods: make(map[string]bool), succeededPods: make(map[string]bool), deadline: time.Now().UTC().Add(maxOperationTime), - tmplCtx: templateresolution.NewContext(wfc.wfclientset, wf.Namespace, wf), + tmplCtx: templateresolution.NewContext(wfc.wftmplInformer.Lister().WorkflowTemplates(wf.Namespace), wf), } if woc.wf.Status.Nodes == nil { diff --git a/workflow/templateresolution/context.go b/workflow/templateresolution/context.go index 2227cd0471ad..32d46db53a5c 100644 --- a/workflow/templateresolution/context.go +++ b/workflow/templateresolution/context.go @@ -3,7 +3,7 @@ package templateresolution import ( "github.com/argoproj/argo/errors" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" - wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" + typed "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/argoproj/argo/workflow/common" log "github.com/sirupsen/logrus" apierr "k8s.io/apimachinery/pkg/api/errors" @@ -13,22 +13,43 @@ import ( // maxResolveDepth is the limit of template reference resolution. const maxResolveDepth int = 10 +// workflowTemplateInterfaceWrapper is an internal struct to wrap clientset. +type workflowTemplateInterfaceWrapper struct { + clientset typed.WorkflowTemplateInterface +} + +// Get retrieves the WorkflowTemplate of a given name. +func (wrapper *workflowTemplateInterfaceWrapper) Get(name string) (*wfv1.WorkflowTemplate, error) { + return wrapper.clientset.Get(name, metav1.GetOptions{}) +} + +// WorkflowTemplateNamespaceLister helps get WorkflowTemplates. +type WorkflowTemplateNamespacedGetter interface { + // Get retrieves the WorkflowTemplate from the indexer for a given name. + Get(name string) (*wfv1.WorkflowTemplate, error) +} + // Context is a context of template search. type Context struct { - // wfClientset is the clientset to get workflow templates. - wfClientset wfclientset.Interface - // namespace is the namespace of template search. - namespace string + // wftmplGetter is an interface to get WorkflowTemplates. + wftmplGetter WorkflowTemplateNamespacedGetter // tmplBase is the base of local template search. tmplBase wfv1.TemplateGetter } // NewContext returns new Context. -func NewContext(wfClientset wfclientset.Interface, namespace string, tmplBase wfv1.TemplateGetter) *Context { +func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, tmplBase wfv1.TemplateGetter) *Context { + return &Context{ + wftmplGetter: wftmplGetter, + tmplBase: tmplBase, + } +} + +// NewContext returns new Context. +func NewContextFromClientset(clientset typed.WorkflowTemplateInterface, tmplBase wfv1.TemplateGetter) *Context { return &Context{ - wfClientset: wfClientset, - namespace: namespace, - tmplBase: tmplBase, + wftmplGetter: &workflowTemplateInterfaceWrapper{clientset: clientset}, + tmplBase: tmplBase, } } @@ -43,7 +64,7 @@ func (ctx *Context) GetTemplateByName(name string) (*wfv1.Template, error) { // GetTemplateFromRef returns a template found by a given template ref. func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Template, error) { - wftmpl, err := ctx.wfClientset.ArgoprojV1alpha1().WorkflowTemplates(ctx.namespace).Get(tmplRef.Name, metav1.GetOptions{}) + wftmpl, err := ctx.wftmplGetter.Get(tmplRef.Name) if err != nil { if apierr.IsNotFound(err) { return nil, errors.Errorf(errors.CodeNotFound, "workflow template %s not found", tmplRef.Name) @@ -82,7 +103,7 @@ func (ctx *Context) GetTemplate(tmplHolder wfv1.TemplateHolder) (*wfv1.Template, func (ctx *Context) GetTemplateBase(tmplHolder wfv1.TemplateHolder) (wfv1.TemplateGetter, error) { tmplRef := tmplHolder.GetTemplateRef() if tmplRef != nil { - wftmpl, err := ctx.wfClientset.ArgoprojV1alpha1().WorkflowTemplates(ctx.namespace).Get(tmplRef.Name, metav1.GetOptions{}) + wftmpl, err := ctx.wftmplGetter.Get(tmplRef.Name) if err != nil && apierr.IsNotFound(err) { return nil, errors.Errorf(errors.CodeNotFound, "workflow template %s not found", tmplRef.Name) } @@ -140,5 +161,5 @@ func (ctx *Context) resolveTemplateImpl(tmplHolder wfv1.TemplateHolder, depth in // WithTemplateBase creates new context with a wfv1.TemplateGetter. func (ctx *Context) WithTemplateBase(tmplBase wfv1.TemplateGetter) *Context { - return NewContext(ctx.wfClientset, ctx.namespace, tmplBase) + return NewContext(ctx.wftmplGetter, tmplBase) } diff --git a/workflow/templateresolution/context_test.go b/workflow/templateresolution/context_test.go index 9773d17658f9..cace58241044 100644 --- a/workflow/templateresolution/context_test.go +++ b/workflow/templateresolution/context_test.go @@ -107,7 +107,7 @@ spec: func TestGetTemplateByName(t *testing.T) { wfClientset := fakewfclientset.NewSimpleClientset() wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml) - ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl) + ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl) tmpl, err := ctx.GetTemplateByName("whalesay") if !assert.NoError(t, err) { @@ -131,7 +131,7 @@ func TestGetTemplateFromRef(t *testing.T) { t.Fatal(err) } wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml) - ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl) + ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl) // Get the template of existing template reference. tmplRef := wfv1.TemplateRef{Name: "some-workflow-template", Template: "whalesay"} @@ -164,7 +164,7 @@ func TestGetTemplate(t *testing.T) { t.Fatal(err) } wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml) - ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl) + ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl) // Get the template of existing template name. tmplHolder := wfv1.Template{Template: "whalesay"} @@ -211,7 +211,7 @@ func TestGetTemplateBase(t *testing.T) { t.Fatal(err) } wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml) - ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl) + ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl) // Get the template base of existing template name. tmplHolder := wfv1.Template{Template: "whalesay"} @@ -266,7 +266,7 @@ func TestResolveTemplate(t *testing.T) { t.Fatal(err) } wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml) - ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl) + ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl) // Get the template of template name. tmplHolder := wfv1.Template{Template: "whalesay"} diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index b58a1c0abfd7..ea4cd5f83b8b 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -85,8 +85,9 @@ func ValidateWorkflow(wfClientset wfclientset.Interface, namespace string, wf *w if wf.Namespace != "" { namespace = wf.Namespace } + ctx := newTemplateValidationCtx(wfClientset, namespace, wf, opts) - tmplCtx := templateresolution.NewContext(wfClientset, namespace, wf) + tmplCtx := templateresolution.NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(namespace), wf) err := validateWorkflowFieldNames(wf.Spec.Templates) if err != nil { @@ -159,7 +160,7 @@ func ValidateWorkflowTemplate(wfClientset wfclientset.Interface, namespace strin namespace = wftmpl.Namespace } ctx := newTemplateValidationCtx(wfClientset, namespace, wftmpl, ValidateOpts{}) - tmplCtx := templateresolution.NewContext(wfClientset, namespace, wftmpl) + tmplCtx := templateresolution.NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(namespace), wftmpl) // Check if all templates can be resolved. for _, template := range wftmpl.Spec.Templates {