diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 9dc01d3..2103e96 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" standardv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1" - "github.com/kubevela/kube-trigger/controllers/config" "github.com/kubevela/kube-trigger/controllers/triggerservice" ) @@ -51,7 +50,6 @@ func main() { metricsAddr string enableLeaderElection bool probeAddr string - controllerConfig config.Config ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -95,7 +93,6 @@ func main() { if err = (&triggerservice.Reconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Config: controllerConfig, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TriggerService") os.Exit(1) diff --git a/controllers/config/config.go b/controllers/config/config.go deleted file mode 100644 index a842f11..0000000 --- a/controllers/config/config.go +++ /dev/null @@ -1,21 +0,0 @@ -/* -Copyright 2022 The KubeVela Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -// Config contains options for controllers. -type Config struct { -} diff --git a/controllers/triggerservice/triggerservice_controller.go b/controllers/triggerservice/triggerservice_controller.go index c719aec..bd50af0 100644 --- a/controllers/triggerservice/triggerservice_controller.go +++ b/controllers/triggerservice/triggerservice_controller.go @@ -37,7 +37,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" standardv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1" - "github.com/kubevela/kube-trigger/controllers/config" "github.com/kubevela/kube-trigger/controllers/utils" "github.com/kubevela/kube-trigger/pkg/templates" "github.com/kubevela/pkg/cue/cuex" @@ -47,7 +46,6 @@ import ( type Reconciler struct { client.Client Scheme *runtime.Scheme - Config config.Config } var ( diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go index 2a9272f..2fe4641 100644 --- a/pkg/cmd/cmd.go +++ b/pkg/cmd/cmd.go @@ -26,6 +26,7 @@ import ( "github.com/kubevela/kube-trigger/pkg/config" "github.com/kubevela/kube-trigger/pkg/eventhandler" "github.com/kubevela/kube-trigger/pkg/executor" + "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher" sourceregistry "github.com/kubevela/kube-trigger/pkg/source/registry" "github.com/kubevela/kube-trigger/pkg/source/types" "github.com/kubevela/kube-trigger/pkg/version" @@ -64,7 +65,10 @@ For example, $LOG_LEVEL can be used in place of --log-level Options have a priority like this: cli-flags > env > default-values` ) -var logger = logrus.WithField("kubetrigger", "main") +var ( + logger = logrus.WithField("kubetrigger", "main") + opt = newOption() +) // NewCommand news a command func NewCommand() *cobra.Command { @@ -77,8 +81,11 @@ func NewCommand() *cobra.Command { return nil }, } - addFlags(c.Flags()) c.AddCommand(newVersionCommand()) + addFlags(opt, c.Flags()) + if err := opt.validate(); err != nil { + panic(err) + } return c } @@ -95,32 +102,21 @@ func newVersionCommand() *cobra.Command { } //nolint:lll -func addFlags(f *pflag.FlagSet) { - f.StringP(FlagConfig, FlagConfigShort, defaultConfig, "Path to config file or directory. If a directory is provided, all files inside that directory will be combined together. Supported file formats are: json, yaml, and cue.") - f.String(FlagLogLevel, defaultLogLevel, "Log level") - f.Int(FlagQueueSize, defaultQueueSize, "Queue size for running actions, this is shared between all watchers") - f.Int(FlagWorkers, defaultWorkers, "Number of workers for running actions, this is shared between all watchers") - f.Int(FlagPerWorkerQPS, defaultPerWorkerQPS, "Long-term QPS limiting per worker, this is shared between all watchers") - f.Int(FlagMaxRetry, defaultMaxRetry, "Retry count after action failed, valid only when action retrying is enabled") - f.Int(FlagRetryDelay, defaultRetryDelay, "First delay to retry actions in seconds, subsequent delay will grow exponentially") - f.Int(FlagTimeout, defaultTimeout, "Timeout for running each action") - f.Int(FlagRegistrySize, defaultRegistrySize, "Cache size for filters and actions") +func addFlags(opt *option, f *pflag.FlagSet) { + f.StringVarP(&opt.Config, FlagConfig, FlagConfigShort, defaultConfig, "Path to config file or directory. If a directory is provided, all files inside that directory will be combined together. Supported file formats are: json, yaml, and cue.") + f.StringVar(&opt.LogLevel, FlagLogLevel, defaultLogLevel, "Log level") + f.IntVar(&opt.QueueSize, FlagQueueSize, defaultQueueSize, "Queue size for running actions, this is shared between all watchers") + f.IntVar(&opt.Workers, FlagWorkers, defaultWorkers, "Number of workers for running actions, this is shared between all watchers") + f.IntVar(&opt.PerWorkerQPS, FlagPerWorkerQPS, defaultPerWorkerQPS, "Long-term QPS limiting per worker, this is shared between all watchers") + f.IntVar(&opt.MaxRetry, FlagMaxRetry, defaultMaxRetry, "Retry count after action failed, valid only when action retrying is enabled") + f.IntVar(&opt.RetryDelay, FlagRetryDelay, defaultRetryDelay, "First delay to retry actions in seconds, subsequent delay will grow exponentially") + f.IntVar(&opt.Timeout, FlagTimeout, defaultTimeout, "Timeout for running each action") + f.IntVar(&opt.RegistrySize, FlagRegistrySize, defaultRegistrySize, "Cache size for filters and actions") + f.StringVar(&k8sresourcewatcher.MultiClusterConfigType, "multi-cluster-config-type", k8sresourcewatcher.TypeClusterGateway, "Multi-cluster config type, supported types: cluster-gateway, cluster-gateway-kubeconfig") } func runCli(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - var err error - - // Read options from env and cli, and fall back to defaults. - opt, err := newOption(). - withDefaults(). - withEnvVariables(). - withCliFlags(cmd.Flags()). - validate() - if err != nil { - return errors.Wrap(err, "error when paring flags") - } - // Set log level. No need to check error, we validated it previously. level, _ := logrus.ParseLevel(opt.LogLevel) logrus.SetLevel(level) @@ -179,7 +175,7 @@ func runCli(cmd *cobra.Command, args []string) error { for _, instance := range instances { err := instance.Run(ctx) if err != nil { - logger.Fatalf("source %s failed to run", instance.Type()) + logger.Fatalf("source %s failed to run: %v", instance.Type(), err) return err } } diff --git a/pkg/cmd/options.go b/pkg/cmd/options.go index 7be1cd3..66f36a3 100644 --- a/pkg/cmd/options.go +++ b/pkg/cmd/options.go @@ -18,13 +18,10 @@ package cmd import ( "fmt" - "os" - "strconv" "time" "github.com/kubevela/kube-trigger/pkg/executor" "github.com/sirupsen/logrus" - "github.com/spf13/pflag" ) type option struct { @@ -57,139 +54,40 @@ const ( defaultRegistrySize = 100 ) -const ( - envStrLogLevel = "LOG_LEVEL" - envStrConfig = "CONFIG" - - envStrQueueSize = "QUEUE_SIZE" - envStrWorkers = "WORKERS" - envStrPerWorkerQPS = "PER_WORKER_QPS" - envStrMaxRetry = "MAX_RETRY" - envStrRetryDelay = "RETRY_DELAY" - envStrActionRetry = "ACTION_RETRY" - envStrTimeout = "TIMEOUT" - - envStrRegistrySize = "REGISTRY_SIZE" -) - func newOption() *option { return &option{} } -func (o *option) withDefaults() *option { - o.LogLevel = defaultLogLevel - o.Config = defaultConfig - o.QueueSize = defaultQueueSize - o.Workers = defaultWorkers - o.PerWorkerQPS = defaultPerWorkerQPS - o.MaxRetry = defaultMaxRetry - o.RetryDelay = defaultRetryDelay - o.ActionRetry = defaultActionRetry - o.Timeout = defaultTimeout - o.RegistrySize = defaultRegistrySize - return o -} - -//nolint:gocognit -func (o *option) withEnvVariables() *option { - if v, ok := os.LookupEnv(envStrLogLevel); ok && v != "" { - o.LogLevel = v - } - if v, ok := os.LookupEnv(envStrConfig); ok && v != "" { - o.Config = v - } - if v, ok := os.LookupEnv(envStrQueueSize); ok && v != "" { - o.QueueSize, _ = strconv.Atoi(v) - } - if v, ok := os.LookupEnv(envStrWorkers); ok && v != "" { - o.Workers, _ = strconv.Atoi(v) - } - if v, ok := os.LookupEnv(envStrPerWorkerQPS); ok && v != "" { - o.PerWorkerQPS, _ = strconv.Atoi(v) - } - if v, ok := os.LookupEnv(envStrMaxRetry); ok && v != "" { - o.MaxRetry, _ = strconv.Atoi(v) - } - if v, ok := os.LookupEnv(envStrRetryDelay); ok && v != "" { - o.RetryDelay, _ = strconv.Atoi(v) - } - if v, ok := os.LookupEnv(envStrActionRetry); ok && v != "" { - o.ActionRetry, _ = strconv.ParseBool(v) - } - if v, ok := os.LookupEnv(envStrTimeout); ok && v != "" { - o.Timeout, _ = strconv.Atoi(v) - } - if v, ok := os.LookupEnv(envStrRegistrySize); ok && v != "" { - o.RegistrySize, _ = strconv.Atoi(v) - } - return o -} - -//nolint:gocognit -func (o *option) withCliFlags(flags *pflag.FlagSet) *option { - if v, err := flags.GetString(FlagLogLevel); err == nil && flags.Changed(FlagLogLevel) { - o.LogLevel = v - } - if v, err := flags.GetString(FlagConfig); err == nil && flags.Changed(FlagConfig) { - o.Config = v - } - if v, err := flags.GetInt(FlagQueueSize); err == nil && flags.Changed(FlagQueueSize) { - o.QueueSize = v - } - if v, err := flags.GetInt(FlagWorkers); err == nil && flags.Changed(FlagWorkers) { - o.Workers = v - } - if v, err := flags.GetInt(FlagPerWorkerQPS); err == nil && flags.Changed(FlagPerWorkerQPS) { - o.PerWorkerQPS = v - } - if v, err := flags.GetInt(FlagMaxRetry); err == nil && flags.Changed(FlagMaxRetry) { - o.MaxRetry = v - } - if v, err := flags.GetInt(FlagRetryDelay); err == nil && flags.Changed(FlagRetryDelay) { - o.RetryDelay = v - } - if v, err := flags.GetBool(FlagActionRetry); err == nil && flags.Changed(FlagActionRetry) { - o.ActionRetry = v - } - if v, err := flags.GetInt(FlagTimeout); err == nil && flags.Changed(FlagTimeout) { - o.Timeout = v - } - if v, err := flags.GetInt(FlagRegistrySize); err == nil && flags.Changed(FlagRegistrySize) { - o.RegistrySize = v - } - return o -} - -func (o *option) validate() (*option, error) { +func (o *option) validate() error { _, err := logrus.ParseLevel(o.LogLevel) if err != nil { - return nil, err + return err } if o.Config == "" { - return nil, fmt.Errorf("%s not specified", FlagConfig) + return fmt.Errorf("%s not specified", FlagConfig) } if o.QueueSize <= 0 { - return nil, fmt.Errorf("%s must be greater than 0", FlagQueueSize) + return fmt.Errorf("%s must be greater than 0", FlagQueueSize) } if o.Workers <= 0 { - return nil, fmt.Errorf("%s must be greater than 0", FlagWorkers) + return fmt.Errorf("%s must be greater than 0", FlagWorkers) } if o.PerWorkerQPS <= 0 { - return nil, fmt.Errorf("%s must be greater than 0", FlagPerWorkerQPS) + return fmt.Errorf("%s must be greater than 0", FlagPerWorkerQPS) } if o.MaxRetry < 0 { - return nil, fmt.Errorf("%s must be greater or equal to 0", FlagMaxRetry) + return fmt.Errorf("%s must be greater or equal to 0", FlagMaxRetry) } if o.RetryDelay < 0 { - return nil, fmt.Errorf("%s must be greater or equal to 0", FlagRetryDelay) + return fmt.Errorf("%s must be greater or equal to 0", FlagRetryDelay) } if o.Timeout <= 0 { - return nil, fmt.Errorf("%s must be greater than 0", FlagTimeout) + return fmt.Errorf("%s must be greater than 0", FlagTimeout) } if o.RegistrySize <= 0 { - return nil, fmt.Errorf("%s must be greater than 0", FlagRegistrySize) + return fmt.Errorf("%s must be greater than 0", FlagRegistrySize) } - return o, nil + return nil } func (o *option) getExecutorConfig() executor.Config { diff --git a/pkg/source/builtin/k8sresourcewatcher/controller/controller.go b/pkg/source/builtin/k8sresourcewatcher/controller/controller.go index 21d0766..7af294b 100644 --- a/pkg/source/builtin/k8sresourcewatcher/controller/controller.go +++ b/pkg/source/builtin/k8sresourcewatcher/controller/controller.go @@ -23,6 +23,7 @@ import ( "github.com/oam-dev/kubevela-core-api/apis/core.oam.dev/v1beta1" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -32,12 +33,9 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "github.com/kubevela/kube-trigger/api/v1alpha1" "github.com/kubevela/kube-trigger/pkg/eventhandler" @@ -51,10 +49,9 @@ var serverStartTime time.Time // Controller object type Controller struct { - logger *logrus.Entry - clientset kubernetes.Interface - queue workqueue.RateLimitingInterface - informer cache.SharedIndexInformer + logger *logrus.Entry + queue workqueue.RateLimitingInterface + informer cache.SharedIndexInformer eventHandlers []eventhandler.EventHandler sourceConf types.Config @@ -67,15 +64,8 @@ func init() { } // Setup prepares controllers -func Setup(ctx context.Context, conf *rest.Config, ctrlConf types.Config, eh []eventhandler.EventHandler) *Controller { - mapper, err := apiutil.NewDiscoveryRESTMapper(conf) - if err != nil { - logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Fatal(err) - } - kubeClient, err := kubernetes.NewForConfig(conf) - if err != nil { - logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Fatalf("Can not create kubernetes client: %v", err) - } +func Setup(ctx context.Context, cli dynamic.Interface, mapper meta.RESTMapper, ctrlConf types.Config, eh []eventhandler.EventHandler) *Controller { + logger := logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher) gv, err := schema.ParseGroupVersion(ctrlConf.APIVersion) if err != nil { logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Fatal(err) @@ -86,23 +76,20 @@ func Setup(ctx context.Context, conf *rest.Config, ctrlConf types.Config, eh []e if err != nil { logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Fatal(err) } - dynamicClient, err := dynamic.NewForConfig(conf) - if err != nil { - logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Fatal(err) - } + informer := cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if len(ctrlConf.MatchingLabels) > 0 { options.LabelSelector = labels.FormatLabels(ctrlConf.MatchingLabels) } - return dynamicClient.Resource(mapping.Resource).Namespace(ctrlConf.Namespace).List(ctx, options) + return cli.Resource(mapping.Resource).Namespace(ctrlConf.Namespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if len(ctrlConf.MatchingLabels) > 0 { options.LabelSelector = labels.FormatLabels(ctrlConf.MatchingLabels) } - return dynamicClient.Resource(mapping.Resource).Namespace(ctrlConf.Namespace).Watch(ctx, options) + return cli.Resource(mapping.Resource).Namespace(ctrlConf.Namespace).Watch(ctx, options) }, }, &unstructured.Unstructured{}, @@ -110,11 +97,7 @@ func Setup(ctx context.Context, conf *rest.Config, ctrlConf types.Config, eh []e cache.Indexers{}, ) - c := newResourceController( - kubeClient, - informer, - ctrlConf.Kind, - ) + c := newResourceController(logger, informer, ctrlConf.Kind) // precheck -> c.sourceConf = ctrlConf c.eventHandlers = eh @@ -130,7 +113,7 @@ func Setup(ctx context.Context, conf *rest.Config, ctrlConf types.Config, eh []e return c } -func newResourceController(client kubernetes.Interface, informer cache.SharedIndexInformer, kind string) *Controller { +func newResourceController(logger *logrus.Entry, informer cache.SharedIndexInformer, kind string) *Controller { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) var newEvent types.InformerEvent var err error @@ -139,7 +122,7 @@ func newResourceController(client kubernetes.Interface, informer cache.SharedInd newEvent.Type = types.EventTypeCreate newEvent.EventObj = obj meta := utils.GetObjectMetaData(obj) - logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Tracef("received add event: %v %s/%s", kind, meta.GetName(), meta.GetNamespace()) + logger.Tracef("received add event: %v %s/%s", kind, meta.GetName(), meta.GetNamespace()) if err == nil { queue.Add(newEvent) } @@ -148,7 +131,7 @@ func newResourceController(client kubernetes.Interface, informer cache.SharedInd newEvent.Type = types.EventTypeUpdate newEvent.EventObj = new meta := utils.GetObjectMetaData(new) - logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Tracef("received update event: %v %s/%s", kind, meta.GetName(), meta.GetNamespace()) + logger.Tracef("received update event: %v %s/%s", kind, meta.GetName(), meta.GetNamespace()) if err == nil { queue.Add(newEvent) } @@ -157,7 +140,7 @@ func newResourceController(client kubernetes.Interface, informer cache.SharedInd newEvent.Type = types.EventTypeDelete newEvent.EventObj = obj meta := utils.GetObjectMetaData(obj) - logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher).Tracef("received delete event: %v %s/%s", kind, meta.GetName(), meta.GetNamespace()) + logger.Tracef("received delete event: %v %s/%s", kind, meta.GetName(), meta.GetNamespace()) if err == nil { queue.Add(newEvent) } @@ -165,10 +148,9 @@ func newResourceController(client kubernetes.Interface, informer cache.SharedInd }) return &Controller{ - logger: logrus.WithField("source", v1alpha1.SourceTypeResourceWatcher), - clientset: client, - informer: informer, - queue: queue, + logger: logger, + informer: informer, + queue: queue, } } diff --git a/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go b/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go index 9cfc63a..e55c872 100644 --- a/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go +++ b/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go @@ -19,26 +19,44 @@ package k8sresourcewatcher import ( "context" "encoding/json" + "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "github.com/kubevela/kube-trigger/api/v1alpha1" "github.com/kubevela/kube-trigger/pkg/eventhandler" "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/controller" "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/types" sourcetypes "github.com/kubevela/kube-trigger/pkg/source/types" + "github.com/kubevela/pkg/multicluster" + "github.com/kubevela/pkg/util/singleton" +) + +func init() { + cfg := singleton.KubeConfig.Get() + cfg.Wrap(multicluster.NewTransportWrapper()) + singleton.KubeConfig.Set(cfg) + singleton.ReloadClients() +} + +var ( + MultiClusterConfigType string ) const ( + TypeClusterGateway string = "cluster-gateway" + TypeClusterGatewaySecret string = "cluster-gateway-secret" + clusterLabel string = "cluster.core.oam.dev/cluster-credential-type" defaultCluster string = "local" clusterCertKey string = "tls.key" @@ -102,59 +120,96 @@ func (w *K8sResourceWatcher) Init(properties *runtime.RawExtension, eh eventhand } func (w *K8sResourceWatcher) Run(ctx context.Context) error { + clusterGetter, err := NewMultiClustersGetter(MultiClusterConfigType) + if err != nil { + return err + } for k, config := range w.configs { - configs, err := getConfigFromSecret(ctx, config.Clusters) - if err != nil { - return err + if len(config.Clusters) == 0 { + config.Clusters = []string{defaultCluster} } - for _, kubeConfig := range configs { - go func(kube *rest.Config, c *types.Config, handlers []eventhandler.EventHandler) { - resourceController := controller.Setup(ctx, kube, *c, handlers) - resourceController.Run(ctx.Done()) - }(kubeConfig, config, w.eventHandlers[k]) + for _, cluster := range config.Clusters { + cli, mapper, err := clusterGetter.GetDynamicClientAndMapper(ctx, cluster) + if err != nil { + return err + } + multiCtx := multicluster.WithCluster(ctx, cluster) + go func(multiCtx context.Context, cli dynamic.Interface, mapper meta.RESTMapper, c *types.Config, handlers []eventhandler.EventHandler) { + resourceController := controller.Setup(multiCtx, cli, mapper, *c, handlers) + resourceController.Run(multiCtx.Done()) + }(multiCtx, cli, mapper, config, w.eventHandlers[k]) } } return nil } -func getConfigFromSecret(ctx context.Context, clusters []string) ([]*rest.Config, error) { - configs := make([]*rest.Config, 0) +func (w *K8sResourceWatcher) Type() string { + return v1alpha1.SourceTypeResourceWatcher +} + +type MultiClustersGetter interface { + GetDynamicClientAndMapper(ctx context.Context, cluster string) (dynamic.Interface, meta.RESTMapper, error) +} + +func NewMultiClustersGetter(typ string) (MultiClustersGetter, error) { config := ctrl.GetConfigOrDie() cli, err := client.New(config, client.Options{Scheme: scheme.Scheme}) if err != nil { return nil, err } - if len(clusters) == 0 { - req, err := labels.NewRequirement(clusterLabel, selection.Exists, nil) - if err != nil { - return nil, err - } - secrets := &corev1.SecretList{} - if err := cli.List(ctx, secrets, client.MatchingLabelsSelector{labels.NewSelector().Add(*req)}); err != nil { - return nil, err - } - for _, secret := range secrets.Items { - configs = append(configs, generateRestConfig(&secret)) - } - configs = append(configs, config) - return configs, nil + switch typ { + case TypeClusterGateway: + return &clusterGatewayGetter{}, nil + case TypeClusterGatewaySecret: + return &clusterGatewaySecretGetter{cli: cli, config: config}, nil + default: + return nil, fmt.Errorf("unknown multi-cluster getter type %s", typ) } - for _, cluster := range clusters { - if cluster == defaultCluster { - configs = append(configs, config) - continue - } - secret := &corev1.Secret{} - if err := cli.Get(ctx, client.ObjectKey{Name: cluster, Namespace: "vela-system"}, secret); err != nil { - return nil, err - } - configs = append(configs, generateRestConfig(secret)) +} + +type clusterGatewayGetter struct{} + +func (c *clusterGatewayGetter) GetDynamicClientAndMapper(ctx context.Context, cluster string) (dynamic.Interface, meta.RESTMapper, error) { + return singleton.DynamicClient.Get(), singleton.RESTMapper.Get(), nil +} + +type clusterGatewaySecretGetter struct { + cli client.Client + config *rest.Config +} + +func (c *clusterGatewaySecretGetter) GetDynamicClientAndMapper(ctx context.Context, cluster string) (dynamic.Interface, meta.RESTMapper, error) { + if cluster == defaultCluster { + return c.getDynamicClientAndMapperFromConfig(ctx, c.config) + } + config, err := c.getRestConfigFromSecret(ctx, cluster) + if err != nil { + return nil, nil, err + } + + return c.getDynamicClientAndMapperFromConfig(ctx, config) +} + +func (c *clusterGatewaySecretGetter) getDynamicClientAndMapperFromConfig(ctx context.Context, config *rest.Config) (dynamic.Interface, meta.RESTMapper, error) { + cli, err := dynamic.NewForConfig(config) + if err != nil { + return nil, nil, err } - return configs, nil + mapper, err := apiutil.NewDynamicRESTMapper(config) + if err != nil { + return nil, nil, err + } + return cli, mapper, nil } -func generateRestConfig(secret *corev1.Secret) *rest.Config { - c := &rest.Config{ +func (c *clusterGatewaySecretGetter) getRestConfigFromSecret(ctx context.Context, cluster string) (*rest.Config, error) { + secret := &corev1.Secret{} + if err := c.cli.Get(ctx, client.ObjectKey{Name: cluster, Namespace: "vela-system"}, secret); err != nil { + return nil, err + } + // currently we only type X509 + // TODO: support other types + conf := &rest.Config{ Host: string(secret.Data[clusterEndpoint]), TLSClientConfig: rest.TLSClientConfig{ KeyData: secret.Data[clusterCertKey], @@ -162,13 +217,9 @@ func generateRestConfig(secret *corev1.Secret) *rest.Config { }, } if ca, ok := secret.Data[clusterCAData]; ok { - c.TLSClientConfig.CAData = ca + conf.TLSClientConfig.CAData = ca } else { - c.Insecure = true + conf.Insecure = true } - return c -} - -func (w *K8sResourceWatcher) Type() string { - return v1alpha1.SourceTypeResourceWatcher + return conf, nil } diff --git a/pkg/templates/static/worker/default.cue b/pkg/templates/static/worker/default.cue index e106cf3..0d8fdc0 100644 --- a/pkg/templates/static/worker/default.cue +++ b/pkg/templates/static/worker/default.cue @@ -44,6 +44,7 @@ deployment: { "--timeout=\(parameter.config.timeout)", "--workers=\(parameter.config.workers)", "--log-level=\(parameter.config.logLevel)", + "--multi-cluster-config-type=\(parameter.config.multiClusterConfigType)", ] image: parameter.image name: "kube-trigger" @@ -101,12 +102,13 @@ parameter: { } serviceAccount: *"kube-trigger" | string config: { - maxRetry: *5 | int - retryDelay: *2 | int - perWorkerQPS: *2 | int - queueSize: *50 | int - timeout: *10 | int - workers: *4 | int - logLevel: *"info" | "debug" + maxRetry: *5 | int + retryDelay: *2 | int + perWorkerQPS: *2 | int + queueSize: *50 | int + timeout: *10 | int + workers: *4 | int + logLevel: *"info" | "debug" + multiClusterConfigType: *"cluster-gateway" | "cluster-gateway-secret" } }