From f5ab3ec1d57cb7caca5f3f8096cd04d53d7d8653 Mon Sep 17 00:00:00 2001 From: Daniil Stepanenko Date: Thu, 7 Nov 2024 12:04:53 +0300 Subject: [PATCH 1/3] WIP Signed-off-by: Daniil Stepanenko --- pkg/addon-operator/ensure_crds.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/addon-operator/ensure_crds.go b/pkg/addon-operator/ensure_crds.go index 72ba9c5e..63d88ed0 100644 --- a/pkg/addon-operator/ensure_crds.go +++ b/pkg/addon-operator/ensure_crds.go @@ -23,6 +23,13 @@ import ( "github.com/flant/kube-client/client" ) +// 1Mb - maximum size of kubernetes object +// if we take less, we have to handle io.ErrShortBuffer error and increase the buffer +// take more does not make any sense due to kubernetes limitations +// Considering that etcd has a default value of 1.5Mb, it was decided to set it to 2Mb, +// so that in most cases we would get a more informative error from Kubernetes, not just "short buffer" +const bufSize = 2 * 1024 * 1024 + var crdGVR = schema.GroupVersionResource{ Group: "apiextensions.k8s.io", Version: "v1", @@ -243,12 +250,9 @@ func (cp *CRDsInstaller) getCRDFromCluster(ctx context.Context, crdName string) // NewCRDsInstaller creates new installer for CRDs func NewCRDsInstaller(client *client.Client, crdFilesPaths []string, crdExtraLabels map[string]string) (*CRDsInstaller, error) { return &CRDsInstaller{ - k8sClient: client.Dynamic(), - crdFilesPaths: crdFilesPaths, - // 1Mb - maximum size of kubernetes object - // if we take less, we have to handle io.ErrShortBuffer error and increase the buffer - // take more does not make any sense due to kubernetes limitations - buffer: make([]byte, 1*1024*1024), + k8sClient: client.Dynamic(), + crdFilesPaths: crdFilesPaths, + buffer: make([]byte, bufSize), k8sTasks: &multierror.Group{}, crdExtraLabels: crdExtraLabels, appliedGVKs: make([]string, 0), From de4d126367d0e663fea93e785b4b4abda7df14e2 Mon Sep 17 00:00:00 2001 From: Daniil Stepanenko Date: Thu, 7 Nov 2024 12:49:16 +0300 Subject: [PATCH 2/3] WIP Signed-off-by: Daniil Stepanenko --- pkg/addon-operator/ensure_crds.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/addon-operator/ensure_crds.go b/pkg/addon-operator/ensure_crds.go index 63d88ed0..40a75340 100644 --- a/pkg/addon-operator/ensure_crds.go +++ b/pkg/addon-operator/ensure_crds.go @@ -161,13 +161,13 @@ func (cp *CRDsInstaller) putCRDToCluster(ctx context.Context, crdReader io.Reade if len(crd.Spec.Group) > 0 { crdGroup = crd.Spec.Group } else { - return fmt.Errorf("Couldn't find CRD's .group key") + return fmt.Errorf("process %s: couldn't find CRD's .group key", crd.Name) } if len(crd.Spec.Names.Kind) > 0 { crdKind = crd.Spec.Names.Kind } else { - return fmt.Errorf("Couldn't find CRD's .spec.names.kind key") + return fmt.Errorf("process %s: couldn't find CRD's .spec.names.kind key", crd.Name) } if len(crd.Spec.Versions) > 0 { @@ -175,7 +175,7 @@ func (cp *CRDsInstaller) putCRDToCluster(ctx context.Context, crdReader io.Reade crdVersions = append(crdVersions, version.Name) } } else { - return fmt.Errorf("Couldn't find CRD's .spec.versions key") + return fmt.Errorf("process %s: couldn't find CRD's .spec.versions key", crd.Name) } cp.appliedGVKsLock.Lock() for _, crdVersion := range crdVersions { From c4e1203a3b44d49258669dbb9fa811ff018e8770 Mon Sep 17 00:00:00 2001 From: Daniil Stepanenko Date: Thu, 7 Nov 2024 16:33:23 +0300 Subject: [PATCH 3/3] WIP Signed-off-by: Daniil Stepanenko --- pkg/addon-operator/ensure_crds.go | 50 +++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/pkg/addon-operator/ensure_crds.go b/pkg/addon-operator/ensure_crds.go index 40a75340..3a586d42 100644 --- a/pkg/addon-operator/ensure_crds.go +++ b/pkg/addon-operator/ensure_crds.go @@ -42,13 +42,7 @@ func (op *AddonOperator) EnsureCRDs(module *modules.BasicModule) ([]string, erro return nil, nil } - result := new(multierror.Error) - cp, err := NewCRDsInstaller(op.KubeClient(), module.GetCRDFilesPaths(), op.CRDExtraLabels) - if err != nil { - result = multierror.Append(result, err) - return nil, result - } - + cp := NewCRDsInstaller(op.KubeClient(), module.GetCRDFilesPaths(), WithExtraLabels(op.CRDExtraLabels)) if cp == nil { return nil, nil } @@ -59,6 +53,18 @@ func (op *AddonOperator) EnsureCRDs(module *modules.BasicModule) ([]string, erro return cp.appliedGVKs, nil } +func WithExtraLabels(labels map[string]string) InstallerOption { + return func(installer *CRDsInstaller) { + installer.crdExtraLabels = labels + } +} + +func WithFileFilter(fn func(path string) bool) InstallerOption { + return func(installer *CRDsInstaller) { + installer.fileFilter = fn + } +} + // CRDsInstaller simultaneously installs CRDs from specified directory type CRDsInstaller struct { k8sClient dynamic.Interface @@ -69,6 +75,7 @@ type CRDsInstaller struct { k8sTasks *multierror.Group crdExtraLabels map[string]string + fileFilter func(path string) bool appliedGVKsLock sync.Mutex // list of GVKs, applied to the cluster @@ -79,6 +86,10 @@ func (cp *CRDsInstaller) Run(ctx context.Context) *multierror.Error { result := new(multierror.Error) for _, crdFilePath := range cp.crdFilesPaths { + if cp.fileFilter != nil && !cp.fileFilter(crdFilePath) { + continue + } + err := cp.processCRD(ctx, crdFilePath) if err != nil { err = fmt.Errorf("error occurred during processing %q file: %w", crdFilePath, err) @@ -247,14 +258,21 @@ func (cp *CRDsInstaller) getCRDFromCluster(ctx context.Context, crdName string) return crd, nil } +type InstallerOption func(*CRDsInstaller) + // NewCRDsInstaller creates new installer for CRDs -func NewCRDsInstaller(client *client.Client, crdFilesPaths []string, crdExtraLabels map[string]string) (*CRDsInstaller, error) { - return &CRDsInstaller{ - k8sClient: client.Dynamic(), - crdFilesPaths: crdFilesPaths, - buffer: make([]byte, bufSize), - k8sTasks: &multierror.Group{}, - crdExtraLabels: crdExtraLabels, - appliedGVKs: make([]string, 0), - }, nil +func NewCRDsInstaller(client *client.Client, crdFilesPaths []string, options ...InstallerOption) *CRDsInstaller { + i := &CRDsInstaller{ + k8sClient: client.Dynamic(), + crdFilesPaths: crdFilesPaths, + buffer: make([]byte, bufSize), + k8sTasks: &multierror.Group{}, + appliedGVKs: make([]string, 0), + } + + for _, opt := range options { + opt(i) + } + + return i }