Skip to content

Commit

Permalink
Merge pull request #65 from linkall-labs/dev
Browse files Browse the repository at this point in the history
Update operator
  • Loading branch information
JieDing authored Sep 7, 2022
2 parents 5750f58 + 7f2ebca commit 3533748
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 216 deletions.
92 changes: 45 additions & 47 deletions operator/api/v1alpha1/connector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand All @@ -31,27 +30,52 @@ type ConnectorSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

ConnectorType Type `json:"type"`
// The image uri of your connector
Image string `json:"image,omitempty"`
// Specify the container in detail if needed
Containers []v1.Container `json:"containers,omitempty"`

Scaler *ScalerSpec `json:"scalerSpec,omitempty"`
ExposePort *int32 `json:"exposePort"`

ConfigRef string `json:"configRef,omitempty"`

SecretRef string `json:"secretRef,omitempty"`

ScalingRule *ScalingRule `json:"scalingRule,omitempty"`
}

type ScalerSpec struct {
// +optional
CheckInterval *int32 `json:"checkInterval,omitempty"`
// +optional
CooldownPeriod *int32 `json:"cooldownPeriod,omitempty"`
type ScalingRule struct {
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
// +optional
MinReplicaCount *int32 `json:"minReplicaCount,omitempty"`
// +optional
CustomScaling *CustomScaling `json:"customScaling,omitempty"`
// +optional
HTTPScaling *HTTPScaling `json:"httpScaling,omitempty"`
}

Metadata map[string]intstr.IntOrString `json:"metadata,omitempty"`
ScalerSecret string `json:"scalerSecret,omitempty"`
type HTTPScaling struct {
Host string `json:"host"`
// +optional
SvcType string `json:"svcType,omitempty"`
// +optional
PendingRequests int32 `json:"pendingRequests,omitempty" description:"The target metric value for the HPA (Default 100)"`
}

type CustomScaling struct {
// +optional
CheckInterval *int32 `json:"checkInterval,omitempty"`
// +optional
CooldownPeriod *int32 `json:"cooldownPeriod,omitempty"`

Triggers []Trigger `json:"triggers"`
}

type Trigger struct {
Type string `json:"type"`
Metadata map[string]string `json:"metadata"`
SecretRef string `json:"secretRef,omitempty"`
}

// ConnectorStatus defines the observed state of Connector
Expand Down Expand Up @@ -118,55 +142,29 @@ type ConnectorList struct {
Items []Connector `json:"items"`
}

type TriggerType string

const (
MySQL TriggerType = "mysql"
SQS TriggerType = "aws-sqs-queue"
)

type ResourceType string

const (
DeployResType ResourceType = "deployment"
SvcResType ResourceType = "service"
CLSvcResType ResourceType = "cl-service"
LBSvcResType ResourceType = "lb-service"
HttpSOResType ResourceType = "httpScaledObject"
SoResType ResourceType = "scaledObject"
TAResType ResourceType = "triggerAuthentication"
)

// Type describes the pattern the image uses to obtain data and reflects to a corresponding scaler.
// +enum
type Type string

const (
// Http type means that the image is a webserver waiting data to be pushed to it.
// This type also reflects to a http scaler.
Http Type = "http"
// ActiveMQ type means that the image fetches data from an ActiveMQ queue
// This type also reflects to a ActiveMQ scaler.
ActiveMQ Type = "activemq"
// ArtemisQue type means that the image fetches data from an ActiveMQ Artemis queue
// This type also reflects to an artemis-queue scaler.
ArtemisQue Type = "artemis-queue"
// Kafka type means that the image fetches data from an Apache Kafka topic
// This type also reflects to a Kafka scaler.
Kafka Type = "kafka"
// AWSCloudwatch type means that the image fetches data from AWS Cloudwatch
// This type also reflects to a AWSCloudwatch scaler.
AWSCloudwatch Type = "aws-cloudwatch"
// AWSKinesisStream type means that the image fetches data from AWS Kinesis Stream
// This type also reflects to a AWSKinesisStream scaler.
AWSKinesisStream Type = "aws-kinesis-stream"
// AWSSqsQueue type means that the image fetches data from AWS Sqs Queue
// This type also reflects to a AWSSqsQueue scaler.
AWSSqsQueue Type = "aws-sqs-queue"
// AZUREAppInsights type means that the image fetches data from Azure Application Insights
// This type also reflects to a AZUREAppInsights scaler.
AZUREAppInsights Type = "azure-app-insights"
// Rabbitmq type means that the image fetches data from Rabbitmq
// This type also reflects to a Rabbitmq scaler.
Rabbitmq Type = "rabbitmq"
)

func (in *Connector) String() string {

return fmt.Sprintf("Image [%s], Mode [%s]",
in.Spec.Image,
in.Spec.ConnectorType)
return fmt.Sprintf("Image [%s]",
in.Spec.Image)
}

func init() {
Expand Down
67 changes: 47 additions & 20 deletions operator/config/crd/bases/vance.io_connectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ spec:
spec:
description: ConnectorSpec defines the desired state of Connector
properties:
configRef:
type: string
containers:
description: Specify the container in detail if needed
items:
Expand Down Expand Up @@ -1252,39 +1254,64 @@ spec:
- name
type: object
type: array
exposePort:
format: int32
type: integer
image:
description: The image uri of your connector
type: string
scalerSpec:
scalingRule:
properties:
checkInterval:
format: int32
type: integer
cooldownPeriod:
format: int32
type: integer
customScaling:
properties:
checkInterval:
format: int32
type: integer
cooldownPeriod:
format: int32
type: integer
triggers:
items:
properties:
metadata:
additionalProperties:
type: string
type: object
secretRef:
type: string
type:
type: string
required:
- metadata
- type
type: object
type: array
required:
- triggers
type: object
httpScaling:
properties:
host:
type: string
pendingRequests:
format: int32
type: integer
svcType:
type: string
required:
- host
type: object
maxReplicaCount:
format: int32
type: integer
metadata:
additionalProperties:
anyOf:
- type: integer
- type: string
x-kubernetes-int-or-string: true
type: object
minReplicaCount:
format: int32
type: integer
scalerSecret:
type: string
type: object
type:
description: Type describes the pattern the image uses to obtain data
and reflects to a corresponding scaler.
secretRef:
type: string
required:
- type
- exposePort
type: object
status:
description: ConnectorStatus defines the observed state of Connector
Expand Down
25 changes: 14 additions & 11 deletions operator/config/samples/rabbitmq_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ metadata:
data:
host: YW1xcDovL3VzZXI6UEFTU1dPUkRAcmFiYml0bXEuZGVmYXVsdC5zdmMuY2x1c3Rlci5sb2NhbDo1Njcy
---
apiVersion: cloud.vance/v1alpha1
apiVersion: vance.io/v1alpha1
kind: Connector
metadata:
name: rabbitmq-connector-sample
name: rabbitmq-sample
spec:
type: rabbitmq
# TODO(user): Add fields here
exposePort: 8080
containers:
- name: rabbitmq-connector-sample
image: jeffhollan/rabbitmq-client:dev
Expand All @@ -19,11 +20,13 @@ spec:
- receive
args:
- "amqp://user:PASSWORD@rabbitmq.default.svc.cluster.local:5672"
scalerSpec:
checkInterval: 5
cooldownPeriod: 5
metadata:
secret: rabbitmq-consumer-secret
queueName: hello
mode: QueueLength
value: "50"
secretRef: rabbitmq-consumer-secret
scalingRule:
customScaling:
triggers:
- type: rabbitmq
metadata:
queueName: hello
mode: QueueLength
value: "50"
secretRef: rabbitmq-consumer-secret
11 changes: 5 additions & 6 deletions operator/config/samples/webhook_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ metadata:
name: webhook-sample
spec:
# TODO(user): Add fields here
type: http
image: tomcat:8.0.18-jre8
scalerSpec:
metadata:
exposePort: 8080
scalingRule:
minReplicaCount: 0
httpScaling:
host: myhost.com
svcPort: 8080
maxReplica: 30
pendingRequests: 20
pendingRequests: 10
27 changes: 18 additions & 9 deletions operator/controllers/connector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,34 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

logger.Error(err, "Getting connector failed")
// 返回错误信息给外部
// return error to outside
return ctrl.Result{}, err
}
// create Deployment for the connector.
if err = createOrUpdateAPIResources(ctx, r, connector, vance.DeployResType); err != nil {
return ctrl.Result{}, err
}
if connector.Spec.Scaler == nil {
logger.Info("scaler is nil")
// create a cluster svc for the connector
if err = createOrUpdateAPIResources(ctx, r, connector, vance.CLSvcResType); err != nil {
return ctrl.Result{}, err
}
if connector.Spec.ScalingRule == nil {
logger.Info("ScalingRule is nil")
} else {
logger.Info("scaler is not nil")
// build a http scaler if the type is http, otherwise build other scalers
if connector.Spec.ConnectorType == vance.Http {
if err = createOrUpdateAPIResources(ctx, r, connector, vance.SvcResType); err != nil {
return ctrl.Result{}, err
logger.Info("ScalingRule is not nil")

if connector.Spec.ScalingRule.HTTPScaling != nil {
// set service type to LB and create an ingress for it
if connector.Spec.ScalingRule.HTTPScaling.SvcType == "LB" {
if err = createOrUpdateAPIResources(ctx, r, connector, vance.LBSvcResType); err != nil {
return ctrl.Result{}, err
}
}

if err = createOrUpdateAPIResources(ctx, r, connector, vance.HttpSOResType); err != nil {
return ctrl.Result{}, err
}
} else {
} else if connector.Spec.ScalingRule.CustomScaling != nil {
if err = createOrUpdateAPIResources(ctx, r, connector, vance.SoResType); err != nil {
return ctrl.Result{}, err
}
Expand Down
Loading

0 comments on commit 3533748

Please sign in to comment.