diff --git a/operator/api/v1alpha1/connector_types.go b/operator/api/v1alpha1/connector_types.go index c7829bcf..ec15d993 100644 --- a/operator/api/v1alpha1/connector_types.go +++ b/operator/api/v1alpha1/connector_types.go @@ -20,7 +20,6 @@ import ( "fmt" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! @@ -31,27 +30,52 @@ type ConnectorSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - ConnectorType Type `json:"type"` // The image uri of your connector Image string `json:"image,omitempty"` // Specify the container in detail if needed Containers []v1.Container `json:"containers,omitempty"` - Scaler *ScalerSpec `json:"scalerSpec,omitempty"` + ExposePort *int32 `json:"exposePort"` + + ConfigRef string `json:"configRef,omitempty"` + + SecretRef string `json:"secretRef,omitempty"` + + ScalingRule *ScalingRule `json:"scalingRule,omitempty"` } -type ScalerSpec struct { - // +optional - CheckInterval *int32 `json:"checkInterval,omitempty"` - // +optional - CooldownPeriod *int32 `json:"cooldownPeriod,omitempty"` +type ScalingRule struct { // +optional MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"` // +optional MinReplicaCount *int32 `json:"minReplicaCount,omitempty"` + // +optional + CustomScaling *CustomScaling `json:"customScaling,omitempty"` + // +optional + HTTPScaling *HTTPScaling `json:"httpScaling,omitempty"` +} - Metadata map[string]intstr.IntOrString `json:"metadata,omitempty"` - ScalerSecret string `json:"scalerSecret,omitempty"` +type HTTPScaling struct { + Host string `json:"host"` + // +optional + SvcType string `json:"svcType,omitempty"` + // +optional + PendingRequests int32 `json:"pendingRequests,omitempty" description:"The target metric value for the HPA (Default 100)"` +} + +type CustomScaling struct { + // +optional + CheckInterval *int32 `json:"checkInterval,omitempty"` + // +optional + CooldownPeriod *int32 `json:"cooldownPeriod,omitempty"` + + Triggers []Trigger `json:"triggers"` +} + +type Trigger struct { + Type string `json:"type"` + Metadata map[string]string `json:"metadata"` + SecretRef string `json:"secretRef,omitempty"` } // ConnectorStatus defines the observed state of Connector @@ -118,55 +142,29 @@ type ConnectorList struct { Items []Connector `json:"items"` } +type TriggerType string + +const ( + MySQL TriggerType = "mysql" + SQS TriggerType = "aws-sqs-queue" +) + type ResourceType string const ( DeployResType ResourceType = "deployment" SvcResType ResourceType = "service" + CLSvcResType ResourceType = "cl-service" + LBSvcResType ResourceType = "lb-service" HttpSOResType ResourceType = "httpScaledObject" SoResType ResourceType = "scaledObject" TAResType ResourceType = "triggerAuthentication" ) -// Type describes the pattern the image uses to obtain data and reflects to a corresponding scaler. -// +enum -type Type string - -const ( - // Http type means that the image is a webserver waiting data to be pushed to it. - // This type also reflects to a http scaler. - Http Type = "http" - // ActiveMQ type means that the image fetches data from an ActiveMQ queue - // This type also reflects to a ActiveMQ scaler. - ActiveMQ Type = "activemq" - // ArtemisQue type means that the image fetches data from an ActiveMQ Artemis queue - // This type also reflects to an artemis-queue scaler. - ArtemisQue Type = "artemis-queue" - // Kafka type means that the image fetches data from an Apache Kafka topic - // This type also reflects to a Kafka scaler. - Kafka Type = "kafka" - // AWSCloudwatch type means that the image fetches data from AWS Cloudwatch - // This type also reflects to a AWSCloudwatch scaler. - AWSCloudwatch Type = "aws-cloudwatch" - // AWSKinesisStream type means that the image fetches data from AWS Kinesis Stream - // This type also reflects to a AWSKinesisStream scaler. - AWSKinesisStream Type = "aws-kinesis-stream" - // AWSSqsQueue type means that the image fetches data from AWS Sqs Queue - // This type also reflects to a AWSSqsQueue scaler. - AWSSqsQueue Type = "aws-sqs-queue" - // AZUREAppInsights type means that the image fetches data from Azure Application Insights - // This type also reflects to a AZUREAppInsights scaler. - AZUREAppInsights Type = "azure-app-insights" - // Rabbitmq type means that the image fetches data from Rabbitmq - // This type also reflects to a Rabbitmq scaler. - Rabbitmq Type = "rabbitmq" -) - func (in *Connector) String() string { - return fmt.Sprintf("Image [%s], Mode [%s]", - in.Spec.Image, - in.Spec.ConnectorType) + return fmt.Sprintf("Image [%s]", + in.Spec.Image) } func init() { diff --git a/operator/config/crd/bases/vance.io_connectors.yaml b/operator/config/crd/bases/vance.io_connectors.yaml index 4cee1db8..779e85c7 100644 --- a/operator/config/crd/bases/vance.io_connectors.yaml +++ b/operator/config/crd/bases/vance.io_connectors.yaml @@ -35,6 +35,8 @@ spec: spec: description: ConnectorSpec defines the desired state of Connector properties: + configRef: + type: string containers: description: Specify the container in detail if needed items: @@ -1252,39 +1254,64 @@ spec: - name type: object type: array + exposePort: + format: int32 + type: integer image: description: The image uri of your connector type: string - scalerSpec: + scalingRule: properties: - checkInterval: - format: int32 - type: integer - cooldownPeriod: - format: int32 - type: integer + customScaling: + properties: + checkInterval: + format: int32 + type: integer + cooldownPeriod: + format: int32 + type: integer + triggers: + items: + properties: + metadata: + additionalProperties: + type: string + type: object + secretRef: + type: string + type: + type: string + required: + - metadata + - type + type: object + type: array + required: + - triggers + type: object + httpScaling: + properties: + host: + type: string + pendingRequests: + format: int32 + type: integer + svcType: + type: string + required: + - host + type: object maxReplicaCount: format: int32 type: integer - metadata: - additionalProperties: - anyOf: - - type: integer - - type: string - x-kubernetes-int-or-string: true - type: object minReplicaCount: format: int32 type: integer - scalerSecret: - type: string type: object - type: - description: Type describes the pattern the image uses to obtain data - and reflects to a corresponding scaler. + secretRef: type: string required: - - type + - exposePort type: object status: description: ConnectorStatus defines the observed state of Connector diff --git a/operator/config/samples/rabbitmq_sample.yaml b/operator/config/samples/rabbitmq_sample.yaml index 1d78e3e8..ea1da08f 100644 --- a/operator/config/samples/rabbitmq_sample.yaml +++ b/operator/config/samples/rabbitmq_sample.yaml @@ -5,12 +5,13 @@ metadata: data: host: YW1xcDovL3VzZXI6UEFTU1dPUkRAcmFiYml0bXEuZGVmYXVsdC5zdmMuY2x1c3Rlci5sb2NhbDo1Njcy --- -apiVersion: cloud.vance/v1alpha1 +apiVersion: vance.io/v1alpha1 kind: Connector metadata: - name: rabbitmq-connector-sample + name: rabbitmq-sample spec: - type: rabbitmq + # TODO(user): Add fields here + exposePort: 8080 containers: - name: rabbitmq-connector-sample image: jeffhollan/rabbitmq-client:dev @@ -19,11 +20,13 @@ spec: - receive args: - "amqp://user:PASSWORD@rabbitmq.default.svc.cluster.local:5672" - scalerSpec: - checkInterval: 5 - cooldownPeriod: 5 - metadata: - secret: rabbitmq-consumer-secret - queueName: hello - mode: QueueLength - value: "50" + secretRef: rabbitmq-consumer-secret + scalingRule: + customScaling: + triggers: + - type: rabbitmq + metadata: + queueName: hello + mode: QueueLength + value: "50" + secretRef: rabbitmq-consumer-secret \ No newline at end of file diff --git a/operator/config/samples/webhook_sample.yaml b/operator/config/samples/webhook_sample.yaml index 38f0e0be..6705026f 100644 --- a/operator/config/samples/webhook_sample.yaml +++ b/operator/config/samples/webhook_sample.yaml @@ -4,11 +4,10 @@ metadata: name: webhook-sample spec: # TODO(user): Add fields here - type: http image: tomcat:8.0.18-jre8 - scalerSpec: - metadata: + exposePort: 8080 + scalingRule: + minReplicaCount: 0 + httpScaling: host: myhost.com - svcPort: 8080 - maxReplica: 30 - pendingRequests: 20 + pendingRequests: 10 \ No newline at end of file diff --git a/operator/controllers/connector_controller.go b/operator/controllers/connector_controller.go index 5ad48b4b..ce9a0aaa 100644 --- a/operator/controllers/connector_controller.go +++ b/operator/controllers/connector_controller.go @@ -78,25 +78,34 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } logger.Error(err, "Getting connector failed") - // 返回错误信息给外部 + // return error to outside return ctrl.Result{}, err } + // create Deployment for the connector. if err = createOrUpdateAPIResources(ctx, r, connector, vance.DeployResType); err != nil { return ctrl.Result{}, err } - if connector.Spec.Scaler == nil { - logger.Info("scaler is nil") + // create a cluster svc for the connector + if err = createOrUpdateAPIResources(ctx, r, connector, vance.CLSvcResType); err != nil { + return ctrl.Result{}, err + } + if connector.Spec.ScalingRule == nil { + logger.Info("ScalingRule is nil") } else { - logger.Info("scaler is not nil") - // build a http scaler if the type is http, otherwise build other scalers - if connector.Spec.ConnectorType == vance.Http { - if err = createOrUpdateAPIResources(ctx, r, connector, vance.SvcResType); err != nil { - return ctrl.Result{}, err + logger.Info("ScalingRule is not nil") + + if connector.Spec.ScalingRule.HTTPScaling != nil { + // set service type to LB and create an ingress for it + if connector.Spec.ScalingRule.HTTPScaling.SvcType == "LB" { + if err = createOrUpdateAPIResources(ctx, r, connector, vance.LBSvcResType); err != nil { + return ctrl.Result{}, err + } } + if err = createOrUpdateAPIResources(ctx, r, connector, vance.HttpSOResType); err != nil { return ctrl.Result{}, err } - } else { + } else if connector.Spec.ScalingRule.CustomScaling != nil { if err = createOrUpdateAPIResources(ctx, r, connector, vance.SoResType); err != nil { return ctrl.Result{}, err } diff --git a/operator/controllers/other_resources.go b/operator/controllers/other_resources.go index e10e3477..dc3f6812 100644 --- a/operator/controllers/other_resources.go +++ b/operator/controllers/other_resources.go @@ -2,7 +2,6 @@ package controllers import ( "context" - "errors" "github.com/go-logr/logr" kedahttp "github.com/kedacore/http-add-on/operator/api/v1alpha1" keda "github.com/kedacore/keda/v2/apis/keda/v1alpha1" @@ -33,9 +32,13 @@ func createOrUpdateAPIResources( { return createOrUpdateScaledObject(ctx, r, connector) } - case vance.SvcResType: + case vance.CLSvcResType: + { + return createOrUpdateService(ctx, r, connector, vance.CLSvcResType) + } + case vance.LBSvcResType: { - return createOrUpdateService(ctx, r, connector) + return createOrUpdateService(ctx, r, connector, vance.LBSvcResType) } case vance.HttpSOResType: { @@ -54,31 +57,25 @@ func createOrUpdateHttpScaledObject( "httpScaledObject name", connector.Name, "httpScaledObject namespace", connector.Namespace) logger.Info(" Start creating or updating a httpScaledObject ") - var svcPort int32 - if svcPort = connector.Spec.Scaler.Metadata["svcPort"].IntVal; svcPort == 0 { - err := errors.New("Error: missing required field of metadata . ") - logger.Error(err, err.Error()) - return err - } - var host string - if host = connector.Spec.Scaler.Metadata["host"].StrVal; host == "" { - err := errors.New("Error: missing required field of metadata . ") - logger.Error(err, err.Error()) - return err - } + var svcPort int32 = *connector.Spec.ExposePort httpso := k8s2.CreateHttpScaledObject( connector.Namespace, connector.Name, - host, + connector.Spec.ScalingRule.HTTPScaling.Host, svcPort) - if minReplica := connector.Spec.Scaler.Metadata["minReplica"].IntVal; minReplica != 0 { - httpso.Spec.Replicas.Min = minReplica + if connector.Spec.ScalingRule.MinReplicaCount != nil { + httpso.Spec.Replicas.Min = *connector.Spec.ScalingRule.MinReplicaCount + } else { + httpso.Spec.Replicas.Min = 0 } - if maxReplica := connector.Spec.Scaler.Metadata["maxReplica"].IntVal; maxReplica != 0 { - httpso.Spec.Replicas.Max = maxReplica + if connector.Spec.ScalingRule.MaxReplicaCount != nil { + httpso.Spec.Replicas.Max = *connector.Spec.ScalingRule.MaxReplicaCount + } else { + httpso.Spec.Replicas.Max = 10 } - if pendingRequests := connector.Spec.Scaler.Metadata["pendingRequests"].IntVal; pendingRequests != 0 { - httpso.Spec.TargetPendingRequests = pendingRequests + + if connector.Spec.ScalingRule.HTTPScaling.PendingRequests != 0 { + httpso.Spec.TargetPendingRequests = connector.Spec.ScalingRule.HTTPScaling.PendingRequests } if err := controllerutil.SetControllerReference(connector, httpso, r.Scheme); err != nil { logger.Error(err, "Set service ControllerReference error") @@ -90,28 +87,37 @@ func createOrUpdateHttpScaledObject( } return nil } + +// createOrUpdateService is used to generate SVCs for HTTP Scaling rules func createOrUpdateService( ctx context.Context, r *ConnectorReconciler, - connector *vance.Connector) error { + connector *vance.Connector, + resourceType vance.ResourceType, +) error { logger := r.Log.WithValues( "service name", connector.Name, "service namespace", connector.Namespace) logger.Info(" Start creating or updating a Service ") - var port int32 - if port = connector.Spec.Scaler.Metadata["svcPort"].IntVal; port == 0 { - err := errors.New("Error: missing required field of metadata . ") - logger.Error(err, err.Error()) - return err + var svcPort int32 = *connector.Spec.ExposePort + var svc *corev1.Service + switch resourceType { + case vance.CLSvcResType: + { + svc = k8s2.CreateCLUService(connector.Namespace, connector.Name, svcPort) + } + case vance.LBSvcResType: + { + svc = k8s2.CreateLBService(connector.Namespace, connector.Name, svcPort) + } } - service := k8s2.CreateLBService(connector.Namespace, connector.Name, - port) - if err := controllerutil.SetControllerReference(connector, service, r.Scheme); err != nil { + + if err := controllerutil.SetControllerReference(connector, svc, r.Scheme); err != nil { logger.Error(err, "Set service ControllerReference error") return err } - if err := createOrPatchObj(ctx, r, service, connector.Name, + if err := createOrPatchObj(ctx, r, svc, connector.Name, connector.Namespace, logger, vance.SvcResType); err != nil { return err } @@ -130,12 +136,34 @@ func createOrUpdateDeployment( logger.Error(err, "Set deployment ControllerReference error") return err } + userConfig := &corev1.ConfigMap{} + var cmVolume corev1.Volume + if connector.Spec.ConfigRef != "" { + existedKey := client.ObjectKey{ + Namespace: connector.Namespace, + Name: connector.Spec.ConfigRef, + } + if err := r.Get(ctx, existedKey, userConfig); err != nil { + if k8serrs.IsNotFound(err) { + logger.Error(err, "no such ConfigMap ", "configRef", connector.Spec.ConfigRef, + "namespace", connector.Namespace) + } else { + logger.Error(err, "fetch ConfigMap err") + } + return err + } + cmVolume = corev1.Volume{Name: connector.Name + "-cmv"} + cmVolume.ConfigMap = &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: connector.Spec.ConfigRef, + }, + } + } if connector.Spec.Containers != nil { logger.Info("custom pod containers") deployment.Spec.Template.Spec.Containers = connector.Spec.Containers } else { logger.Info("simply provide image url") - deployment.Spec.Template.Spec.Containers = []corev1.Container{ { Name: connector.Name, @@ -145,7 +173,21 @@ func createOrUpdateDeployment( }, } } - logger.Info(" Create an in-memory Deployment ", + // Add configMap volume to Spec.Volumes if the cmVolume is ready + // Also add a VolumeMount to Containers[0].VolumeMounts and set the MountPath as "/vance/config" + if cmVolume.Name != "" { + deployment.Spec.Template.Spec.Volumes = []corev1.Volume{ + cmVolume, + } + deployment.Spec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{ + { + Name: cmVolume.Name, + MountPath: "/vance/config", + }, + } + } + + logger.Info("Create an in-memory Deployment ", "deployment", *deployment) if err := createOrPatchObj(ctx, r, deployment, connector.Name, connector.Namespace, logger, vance.DeployResType); err != nil { diff --git a/operator/controllers/scaled_object.go b/operator/controllers/scaled_object.go index 40fce760..b4568c8a 100644 --- a/operator/controllers/scaled_object.go +++ b/operator/controllers/scaled_object.go @@ -11,7 +11,6 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "strconv" ) func createOrUpdateScaledObject( @@ -25,97 +24,96 @@ func createOrUpdateScaledObject( logger.Info(" Start creating or updating a ScaledObject ") so := k8s2.CreateScaledObject( connector.Namespace, soName, connector.Name, - string(connector.Spec.ConnectorType), + connector.Spec.ScalingRule.CustomScaling.Triggers, ) - if connector.Spec.Scaler.CheckInterval != nil { - so.Spec.PollingInterval = connector.Spec.Scaler.CheckInterval + if connector.Spec.ScalingRule.CustomScaling.CheckInterval != nil { + so.Spec.PollingInterval = connector.Spec.ScalingRule.CustomScaling.CheckInterval } - if connector.Spec.Scaler.CooldownPeriod != nil { - so.Spec.CooldownPeriod = connector.Spec.Scaler.CooldownPeriod + if connector.Spec.ScalingRule.CustomScaling.CooldownPeriod != nil { + so.Spec.CooldownPeriod = connector.Spec.ScalingRule.CustomScaling.CooldownPeriod } - if connector.Spec.Scaler.MaxReplicaCount != nil { - so.Spec.MaxReplicaCount = connector.Spec.Scaler.MaxReplicaCount + if connector.Spec.ScalingRule.MaxReplicaCount != nil { + so.Spec.MaxReplicaCount = connector.Spec.ScalingRule.MaxReplicaCount } - if connector.Spec.Scaler.MinReplicaCount != nil { - so.Spec.MinReplicaCount = connector.Spec.Scaler.MinReplicaCount + if connector.Spec.ScalingRule.MinReplicaCount != nil { + so.Spec.MinReplicaCount = connector.Spec.ScalingRule.MinReplicaCount } userSecret := &corev1.Secret{} - secretName := connector.Spec.Scaler.Metadata["secret"].StrVal - if secretName != "" { - existedKey := client.ObjectKey{ - Namespace: connector.Namespace, - Name: secretName, + for _, trigger := range connector.Spec.ScalingRule.CustomScaling.Triggers { + if _, existedKey := ScalerConf[trigger.Type+"-auth"]; !existedKey { + err := errors.New("TriggerType not found error") + logger.Error(err, "trigger type "+trigger.Type+" not supported") + return err } - var desiredAuth []string - if err := r.Get(ctx, existedKey, userSecret); err != nil { - if k8serrors.IsNotFound(err) { - logger.Error(err, "no such secret ", secretName, - "namespace", connector.Namespace) - } else { - logger.Error(err, "fetch secret err") + var soTrigger = keda.ScaleTriggers{} + soTrigger.Type = trigger.Type + soTrigger.Metadata = trigger.Metadata + if trigger.SecretRef != "" { + existedKey := client.ObjectKey{ + Namespace: connector.Namespace, + Name: trigger.SecretRef, } - return err - } else { - if ok, v := util.IsValidSecret(util.WrapSBM(userSecret.Data), ScalerConf[string(connector.Spec.ConnectorType)+"-auth"]); !ok { - err := errors.New("secret " + secretName + " misses required field") - logger.Error(err, "secret misses required field", "missing field", v) + var desiredAuth []string + if err := r.Get(ctx, existedKey, userSecret); err != nil { + if k8serrors.IsNotFound(err) { + logger.Error(err, "no such secret ", "trigger.SecretRef", trigger.SecretRef, + "namespace", connector.Namespace) + } else { + logger.Error(err, "fetch secret err") + } return err } else { - desiredAuth = v + if ok, v := util.IsValidSecret(util.WrapSBM(userSecret.Data), ScalerConf[string(trigger.Type)+"-auth"]); !ok { + err := errors.New("secret " + trigger.SecretRef + " misses required field") + logger.Error(err, "secret misses required field") + return err + } else { + desiredAuth = v + } } - } - // create a TriggerAuthentication if provided secret is valid - taName := resourceName(connector.Name, vance.TAResType) - ta := k8s2.CreateTriggerAuthentication(connector.Namespace, taName) - logger.Info("scalerConfData", "map len", len(desiredAuth)) - for i, v := range desiredAuth { - logger.Info("scalerConfMap", "value", v) - ta.Spec.SecretTargetRef = append(ta.Spec.SecretTargetRef, keda.AuthSecretTargetRef{ - Parameter: v, - Name: secretName, - Key: v, + // create a TriggerAuthentication if provided secret is valid + taName := resourceName(connector.Name, vance.TAResType) + ta := k8s2.CreateTriggerAuthentication(connector.Namespace, taName) + logger.Info("scalerConfData", "map len", len(desiredAuth)) + for i, v := range desiredAuth { + logger.Info("scalerConfMap", "value", v) + ta.Spec.SecretTargetRef = append(ta.Spec.SecretTargetRef, keda.AuthSecretTargetRef{ + Parameter: v, + Name: trigger.SecretRef, + Key: v, + }) + logger.Info("ta", "slice", ta.Spec.SecretTargetRef[i]) + } + logger.Info("TA SecretTargetRef", "len of SecretTargetRef", len(ta.Spec.SecretTargetRef)) + logger = r.Log.WithValues("TriggerAuthentication name", taName, + "TriggerAuthentication namespace", connector.Namespace) + logger.Info("create a TriggerAuthentication", "ta", ta) + if err := controllerutil.SetControllerReference(connector, ta, r.Scheme); err != nil { + logger.Error(err, "Set TriggerAuthentication ControllerReference error") + return err + } + if err := createOrPatchObj(ctx, r, ta, taName, + connector.Namespace, logger, vance.TAResType); err != nil { + return err + } + so.Spec.Triggers = append(so.Spec.Triggers, keda.ScaleTriggers{ + Type: trigger.Type, + Metadata: trigger.Metadata, + AuthenticationRef: &keda.ScaledObjectAuthRef{ + Name: taName, + }, }) - logger.Info("ta", "slice", ta.Spec.SecretTargetRef[i]) - } - logger.Info("TA SecretTargetRef", "len of SecretTargetRef", len(ta.Spec.SecretTargetRef)) - logger = r.Log.WithValues("TriggerAuthentication name", taName, - "TriggerAuthentication namespace", connector.Namespace) - logger.Info("create a TriggerAuthentication", "ta", ta) - if err := controllerutil.SetControllerReference(connector, ta, r.Scheme); err != nil { - logger.Error(err, "Set TriggerAuthentication ControllerReference error") - return err - } - if err := createOrPatchObj(ctx, r, ta, taName, - connector.Namespace, logger, vance.TAResType); err != nil { - return err - } - so.Spec.Triggers[0].Metadata = make(map[string]string) - for k, v := range connector.Spec.Scaler.Metadata { - if k == "secret" { - continue + + if err := controllerutil.SetControllerReference(connector, so, r.Scheme); err != nil { + logger.Error(err, "Set ScaledObject ControllerReference error") + return err } - if v.StrVal != "" { - so.Spec.Triggers[0].Metadata[k] = v.StrVal - } else { - so.Spec.Triggers[0].Metadata[k] = strconv.FormatInt(int64(v.IntVal), 10) + if err := createOrPatchObj(ctx, r, so, soName, + connector.Namespace, logger, vance.SoResType); err != nil { + return err } } - logger = r.Log.WithValues( - "ScaledObject name", soName, - "ScaledObject namespace", connector.Namespace) - logger.Info("ScaledObject triggers", "so metadata", so.Spec.Triggers[0].Metadata) - so.Spec.Triggers[0].AuthenticationRef = &keda.ScaledObjectAuthRef{ - Name: taName, - } - if err := controllerutil.SetControllerReference(connector, so, r.Scheme); err != nil { - logger.Error(err, "Set ScaledObject ControllerReference error") - return err - } - if err := createOrPatchObj(ctx, r, so, soName, - connector.Namespace, logger, vance.SoResType); err != nil { - return err - } } return nil diff --git a/operator/pkg/config/scale_config.json b/operator/pkg/config/scale_config.json index 124ac59b..a787c293 100644 --- a/operator/pkg/config/scale_config.json +++ b/operator/pkg/config/scale_config.json @@ -1,4 +1,5 @@ { "rabbitmq-auth": [["host"]], - "rabbitmq2-auth": [["a","b","hello_world"],["c","d"],["e","f"]] + "mysql-auth": [["connectionString"],["host","port","dbName","username","password"]], + "aws-sqs-queue-auth": [["awsRoleArn"],["awsAccessKeyID","awsSecretAccessKey"]] } \ No newline at end of file diff --git a/operator/pkg/k8s/scaledobject_keeper.go b/operator/pkg/k8s/scaledobject_keeper.go index 7f50102c..643bea29 100644 --- a/operator/pkg/k8s/scaledobject_keeper.go +++ b/operator/pkg/k8s/scaledobject_keeper.go @@ -2,10 +2,11 @@ package k8s import ( keda "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + vance "github.com/linkall-labs/vance/operator/api/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func CreateScaledObject(nameSpace, name, refName, scalerType string) *keda.ScaledObject { +func CreateScaledObject(nameSpace, name, refName string, triggers []vance.Trigger) *keda.ScaledObject { scaledObject := &keda.ScaledObject{ ObjectMeta: metav1.ObjectMeta{ @@ -16,12 +17,9 @@ func CreateScaledObject(nameSpace, name, refName, scalerType string) *keda.Scale ScaleTargetRef: &keda.ScaleTarget{ Name: refName, }, - Triggers: []keda.ScaleTriggers{ - { - Type: scalerType, - }, - }, + Triggers: []keda.ScaleTriggers{}, }, } + return scaledObject } diff --git a/operator/pkg/k8s/service_keeper.go b/operator/pkg/k8s/service_keeper.go index 4d733d33..1b0344db 100644 --- a/operator/pkg/k8s/service_keeper.go +++ b/operator/pkg/k8s/service_keeper.go @@ -3,10 +3,10 @@ package k8s import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" ) -func CreateLBService(nameSpace, name string, port int32) *corev1.Service { +// createBaseService creates a K8S service without setting its service type +func createBaseService(nameSpace, name string, port int32) *corev1.Service { service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: nameSpace, @@ -14,16 +14,27 @@ func CreateLBService(nameSpace, name string, port int32) *corev1.Service { }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{{ - //Port: connector.Spec.Scaler.Metadata["svcPort"].IntVal, - Port: port, - TargetPort: intstr.IntOrString{IntVal: 8080}, + Port: port, }, }, Selector: map[string]string{ "app": name, }, - Type: corev1.ServiceTypeLoadBalancer, }, } return service } + +// CreateCLUService creates a K8S Cluster Service +func CreateCLUService(nameSpace, name string, port int32) *corev1.Service { + service := createBaseService(nameSpace, name, port) + service.Spec.Type = corev1.ServiceTypeClusterIP + return service +} + +// CreateLBService creates a K8S LoadBalance Service +func CreateLBService(nameSpace, name string, port int32) *corev1.Service { + service := createBaseService(nameSpace, name, port) + service.Spec.Type = corev1.ServiceTypeLoadBalancer + return service +}