Skip to content

Commit

Permalink
Feat: support cluster gateway for multi cluster (#32)
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong authored Feb 9, 2023
1 parent bd94cb6 commit 3125442
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 252 deletions.
3 changes: 0 additions & 3 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

standardv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1"
"github.com/kubevela/kube-trigger/controllers/config"
"github.com/kubevela/kube-trigger/controllers/triggerservice"
)

Expand All @@ -51,7 +50,6 @@ func main() {
metricsAddr string
enableLeaderElection bool
probeAddr string
controllerConfig config.Config
)

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
Expand Down Expand Up @@ -95,7 +93,6 @@ func main() {
if err = (&triggerservice.Reconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Config: controllerConfig,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TriggerService")
os.Exit(1)
Expand Down
21 changes: 0 additions & 21 deletions controllers/config/config.go

This file was deleted.

2 changes: 0 additions & 2 deletions controllers/triggerservice/triggerservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

standardv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1"
"github.com/kubevela/kube-trigger/controllers/config"
"github.com/kubevela/kube-trigger/controllers/utils"
"github.com/kubevela/kube-trigger/pkg/templates"
"github.com/kubevela/pkg/cue/cuex"
Expand All @@ -47,7 +46,6 @@ import (
type Reconciler struct {
client.Client
Scheme *runtime.Scheme
Config config.Config
}

var (
Expand Down
46 changes: 21 additions & 25 deletions pkg/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubevela/kube-trigger/pkg/config"
"github.com/kubevela/kube-trigger/pkg/eventhandler"
"github.com/kubevela/kube-trigger/pkg/executor"
"github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher"
sourceregistry "github.com/kubevela/kube-trigger/pkg/source/registry"
"github.com/kubevela/kube-trigger/pkg/source/types"
"github.com/kubevela/kube-trigger/pkg/version"
Expand Down Expand Up @@ -64,7 +65,10 @@ For example, $LOG_LEVEL can be used in place of --log-level
Options have a priority like this: cli-flags > env > default-values`
)

var logger = logrus.WithField("kubetrigger", "main")
var (
logger = logrus.WithField("kubetrigger", "main")
opt = newOption()
)

// NewCommand news a command
func NewCommand() *cobra.Command {
Expand All @@ -77,8 +81,11 @@ func NewCommand() *cobra.Command {
return nil
},
}
addFlags(c.Flags())
c.AddCommand(newVersionCommand())
addFlags(opt, c.Flags())
if err := opt.validate(); err != nil {
panic(err)
}
return c
}

Expand All @@ -95,32 +102,21 @@ func newVersionCommand() *cobra.Command {
}

//nolint:lll
func addFlags(f *pflag.FlagSet) {
f.StringP(FlagConfig, FlagConfigShort, defaultConfig, "Path to config file or directory. If a directory is provided, all files inside that directory will be combined together. Supported file formats are: json, yaml, and cue.")
f.String(FlagLogLevel, defaultLogLevel, "Log level")
f.Int(FlagQueueSize, defaultQueueSize, "Queue size for running actions, this is shared between all watchers")
f.Int(FlagWorkers, defaultWorkers, "Number of workers for running actions, this is shared between all watchers")
f.Int(FlagPerWorkerQPS, defaultPerWorkerQPS, "Long-term QPS limiting per worker, this is shared between all watchers")
f.Int(FlagMaxRetry, defaultMaxRetry, "Retry count after action failed, valid only when action retrying is enabled")
f.Int(FlagRetryDelay, defaultRetryDelay, "First delay to retry actions in seconds, subsequent delay will grow exponentially")
f.Int(FlagTimeout, defaultTimeout, "Timeout for running each action")
f.Int(FlagRegistrySize, defaultRegistrySize, "Cache size for filters and actions")
func addFlags(opt *option, f *pflag.FlagSet) {
f.StringVarP(&opt.Config, FlagConfig, FlagConfigShort, defaultConfig, "Path to config file or directory. If a directory is provided, all files inside that directory will be combined together. Supported file formats are: json, yaml, and cue.")
f.StringVar(&opt.LogLevel, FlagLogLevel, defaultLogLevel, "Log level")
f.IntVar(&opt.QueueSize, FlagQueueSize, defaultQueueSize, "Queue size for running actions, this is shared between all watchers")
f.IntVar(&opt.Workers, FlagWorkers, defaultWorkers, "Number of workers for running actions, this is shared between all watchers")
f.IntVar(&opt.PerWorkerQPS, FlagPerWorkerQPS, defaultPerWorkerQPS, "Long-term QPS limiting per worker, this is shared between all watchers")
f.IntVar(&opt.MaxRetry, FlagMaxRetry, defaultMaxRetry, "Retry count after action failed, valid only when action retrying is enabled")
f.IntVar(&opt.RetryDelay, FlagRetryDelay, defaultRetryDelay, "First delay to retry actions in seconds, subsequent delay will grow exponentially")
f.IntVar(&opt.Timeout, FlagTimeout, defaultTimeout, "Timeout for running each action")
f.IntVar(&opt.RegistrySize, FlagRegistrySize, defaultRegistrySize, "Cache size for filters and actions")
f.StringVar(&k8sresourcewatcher.MultiClusterConfigType, "multi-cluster-config-type", k8sresourcewatcher.TypeClusterGateway, "Multi-cluster config type, supported types: cluster-gateway, cluster-gateway-kubeconfig")
}

func runCli(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
var err error

// Read options from env and cli, and fall back to defaults.
opt, err := newOption().
withDefaults().
withEnvVariables().
withCliFlags(cmd.Flags()).
validate()
if err != nil {
return errors.Wrap(err, "error when paring flags")
}

// Set log level. No need to check error, we validated it previously.
level, _ := logrus.ParseLevel(opt.LogLevel)
logrus.SetLevel(level)
Expand Down Expand Up @@ -179,7 +175,7 @@ func runCli(cmd *cobra.Command, args []string) error {
for _, instance := range instances {
err := instance.Run(ctx)
if err != nil {
logger.Fatalf("source %s failed to run", instance.Type())
logger.Fatalf("source %s failed to run: %v", instance.Type(), err)
return err
}
}
Expand Down
124 changes: 11 additions & 113 deletions pkg/cmd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ package cmd

import (
"fmt"
"os"
"strconv"
"time"

"github.com/kubevela/kube-trigger/pkg/executor"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
)

type option struct {
Expand Down Expand Up @@ -57,139 +54,40 @@ const (
defaultRegistrySize = 100
)

const (
envStrLogLevel = "LOG_LEVEL"
envStrConfig = "CONFIG"

envStrQueueSize = "QUEUE_SIZE"
envStrWorkers = "WORKERS"
envStrPerWorkerQPS = "PER_WORKER_QPS"
envStrMaxRetry = "MAX_RETRY"
envStrRetryDelay = "RETRY_DELAY"
envStrActionRetry = "ACTION_RETRY"
envStrTimeout = "TIMEOUT"

envStrRegistrySize = "REGISTRY_SIZE"
)

func newOption() *option {
return &option{}
}

func (o *option) withDefaults() *option {
o.LogLevel = defaultLogLevel
o.Config = defaultConfig
o.QueueSize = defaultQueueSize
o.Workers = defaultWorkers
o.PerWorkerQPS = defaultPerWorkerQPS
o.MaxRetry = defaultMaxRetry
o.RetryDelay = defaultRetryDelay
o.ActionRetry = defaultActionRetry
o.Timeout = defaultTimeout
o.RegistrySize = defaultRegistrySize
return o
}

//nolint:gocognit
func (o *option) withEnvVariables() *option {
if v, ok := os.LookupEnv(envStrLogLevel); ok && v != "" {
o.LogLevel = v
}
if v, ok := os.LookupEnv(envStrConfig); ok && v != "" {
o.Config = v
}
if v, ok := os.LookupEnv(envStrQueueSize); ok && v != "" {
o.QueueSize, _ = strconv.Atoi(v)
}
if v, ok := os.LookupEnv(envStrWorkers); ok && v != "" {
o.Workers, _ = strconv.Atoi(v)
}
if v, ok := os.LookupEnv(envStrPerWorkerQPS); ok && v != "" {
o.PerWorkerQPS, _ = strconv.Atoi(v)
}
if v, ok := os.LookupEnv(envStrMaxRetry); ok && v != "" {
o.MaxRetry, _ = strconv.Atoi(v)
}
if v, ok := os.LookupEnv(envStrRetryDelay); ok && v != "" {
o.RetryDelay, _ = strconv.Atoi(v)
}
if v, ok := os.LookupEnv(envStrActionRetry); ok && v != "" {
o.ActionRetry, _ = strconv.ParseBool(v)
}
if v, ok := os.LookupEnv(envStrTimeout); ok && v != "" {
o.Timeout, _ = strconv.Atoi(v)
}
if v, ok := os.LookupEnv(envStrRegistrySize); ok && v != "" {
o.RegistrySize, _ = strconv.Atoi(v)
}
return o
}

//nolint:gocognit
func (o *option) withCliFlags(flags *pflag.FlagSet) *option {
if v, err := flags.GetString(FlagLogLevel); err == nil && flags.Changed(FlagLogLevel) {
o.LogLevel = v
}
if v, err := flags.GetString(FlagConfig); err == nil && flags.Changed(FlagConfig) {
o.Config = v
}
if v, err := flags.GetInt(FlagQueueSize); err == nil && flags.Changed(FlagQueueSize) {
o.QueueSize = v
}
if v, err := flags.GetInt(FlagWorkers); err == nil && flags.Changed(FlagWorkers) {
o.Workers = v
}
if v, err := flags.GetInt(FlagPerWorkerQPS); err == nil && flags.Changed(FlagPerWorkerQPS) {
o.PerWorkerQPS = v
}
if v, err := flags.GetInt(FlagMaxRetry); err == nil && flags.Changed(FlagMaxRetry) {
o.MaxRetry = v
}
if v, err := flags.GetInt(FlagRetryDelay); err == nil && flags.Changed(FlagRetryDelay) {
o.RetryDelay = v
}
if v, err := flags.GetBool(FlagActionRetry); err == nil && flags.Changed(FlagActionRetry) {
o.ActionRetry = v
}
if v, err := flags.GetInt(FlagTimeout); err == nil && flags.Changed(FlagTimeout) {
o.Timeout = v
}
if v, err := flags.GetInt(FlagRegistrySize); err == nil && flags.Changed(FlagRegistrySize) {
o.RegistrySize = v
}
return o
}

func (o *option) validate() (*option, error) {
func (o *option) validate() error {
_, err := logrus.ParseLevel(o.LogLevel)
if err != nil {
return nil, err
return err
}
if o.Config == "" {
return nil, fmt.Errorf("%s not specified", FlagConfig)
return fmt.Errorf("%s not specified", FlagConfig)
}
if o.QueueSize <= 0 {
return nil, fmt.Errorf("%s must be greater than 0", FlagQueueSize)
return fmt.Errorf("%s must be greater than 0", FlagQueueSize)
}
if o.Workers <= 0 {
return nil, fmt.Errorf("%s must be greater than 0", FlagWorkers)
return fmt.Errorf("%s must be greater than 0", FlagWorkers)
}
if o.PerWorkerQPS <= 0 {
return nil, fmt.Errorf("%s must be greater than 0", FlagPerWorkerQPS)
return fmt.Errorf("%s must be greater than 0", FlagPerWorkerQPS)
}
if o.MaxRetry < 0 {
return nil, fmt.Errorf("%s must be greater or equal to 0", FlagMaxRetry)
return fmt.Errorf("%s must be greater or equal to 0", FlagMaxRetry)
}
if o.RetryDelay < 0 {
return nil, fmt.Errorf("%s must be greater or equal to 0", FlagRetryDelay)
return fmt.Errorf("%s must be greater or equal to 0", FlagRetryDelay)
}
if o.Timeout <= 0 {
return nil, fmt.Errorf("%s must be greater than 0", FlagTimeout)
return fmt.Errorf("%s must be greater than 0", FlagTimeout)
}
if o.RegistrySize <= 0 {
return nil, fmt.Errorf("%s must be greater than 0", FlagRegistrySize)
return fmt.Errorf("%s must be greater than 0", FlagRegistrySize)
}
return o, nil
return nil
}

func (o *option) getExecutorConfig() executor.Config {
Expand Down
Loading

0 comments on commit 3125442

Please sign in to comment.