Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update operator #65

Merged
merged 4 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 45 additions & 47 deletions operator/api/v1alpha1/connector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand All @@ -31,27 +30,52 @@ type ConnectorSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

ConnectorType Type `json:"type"`
// The image uri of your connector
Image string `json:"image,omitempty"`
// Specify the container in detail if needed
Containers []v1.Container `json:"containers,omitempty"`

Scaler *ScalerSpec `json:"scalerSpec,omitempty"`
ExposePort *int32 `json:"exposePort"`

ConfigRef string `json:"configRef,omitempty"`

SecretRef string `json:"secretRef,omitempty"`

ScalingRule *ScalingRule `json:"scalingRule,omitempty"`
}

type ScalerSpec struct {
// +optional
CheckInterval *int32 `json:"checkInterval,omitempty"`
// +optional
CooldownPeriod *int32 `json:"cooldownPeriod,omitempty"`
type ScalingRule struct {
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
// +optional
MinReplicaCount *int32 `json:"minReplicaCount,omitempty"`
// +optional
CustomScaling *CustomScaling `json:"customScaling,omitempty"`
// +optional
HTTPScaling *HTTPScaling `json:"httpScaling,omitempty"`
}

Metadata map[string]intstr.IntOrString `json:"metadata,omitempty"`
ScalerSecret string `json:"scalerSecret,omitempty"`
type HTTPScaling struct {
Host string `json:"host"`
// +optional
SvcType string `json:"svcType,omitempty"`
// +optional
PendingRequests int32 `json:"pendingRequests,omitempty" description:"The target metric value for the HPA (Default 100)"`
}

type CustomScaling struct {
// +optional
CheckInterval *int32 `json:"checkInterval,omitempty"`
// +optional
CooldownPeriod *int32 `json:"cooldownPeriod,omitempty"`

Triggers []Trigger `json:"triggers"`
}

type Trigger struct {
Type string `json:"type"`
Metadata map[string]string `json:"metadata"`
SecretRef string `json:"secretRef,omitempty"`
}

// ConnectorStatus defines the observed state of Connector
Expand Down Expand Up @@ -118,55 +142,29 @@ type ConnectorList struct {
Items []Connector `json:"items"`
}

type TriggerType string

const (
MySQL TriggerType = "mysql"
SQS TriggerType = "aws-sqs-queue"
)

type ResourceType string

const (
DeployResType ResourceType = "deployment"
SvcResType ResourceType = "service"
CLSvcResType ResourceType = "cl-service"
LBSvcResType ResourceType = "lb-service"
HttpSOResType ResourceType = "httpScaledObject"
SoResType ResourceType = "scaledObject"
TAResType ResourceType = "triggerAuthentication"
)

// Type describes the pattern the image uses to obtain data and reflects to a corresponding scaler.
// +enum
type Type string

const (
// Http type means that the image is a webserver waiting data to be pushed to it.
// This type also reflects to a http scaler.
Http Type = "http"
// ActiveMQ type means that the image fetches data from an ActiveMQ queue
// This type also reflects to a ActiveMQ scaler.
ActiveMQ Type = "activemq"
// ArtemisQue type means that the image fetches data from an ActiveMQ Artemis queue
// This type also reflects to an artemis-queue scaler.
ArtemisQue Type = "artemis-queue"
// Kafka type means that the image fetches data from an Apache Kafka topic
// This type also reflects to a Kafka scaler.
Kafka Type = "kafka"
// AWSCloudwatch type means that the image fetches data from AWS Cloudwatch
// This type also reflects to a AWSCloudwatch scaler.
AWSCloudwatch Type = "aws-cloudwatch"
// AWSKinesisStream type means that the image fetches data from AWS Kinesis Stream
// This type also reflects to a AWSKinesisStream scaler.
AWSKinesisStream Type = "aws-kinesis-stream"
// AWSSqsQueue type means that the image fetches data from AWS Sqs Queue
// This type also reflects to a AWSSqsQueue scaler.
AWSSqsQueue Type = "aws-sqs-queue"
// AZUREAppInsights type means that the image fetches data from Azure Application Insights
// This type also reflects to a AZUREAppInsights scaler.
AZUREAppInsights Type = "azure-app-insights"
// Rabbitmq type means that the image fetches data from Rabbitmq
// This type also reflects to a Rabbitmq scaler.
Rabbitmq Type = "rabbitmq"
)

func (in *Connector) String() string {

return fmt.Sprintf("Image [%s], Mode [%s]",
in.Spec.Image,
in.Spec.ConnectorType)
return fmt.Sprintf("Image [%s]",
in.Spec.Image)
}

func init() {
Expand Down
67 changes: 47 additions & 20 deletions operator/config/crd/bases/vance.io_connectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ spec:
spec:
description: ConnectorSpec defines the desired state of Connector
properties:
configRef:
type: string
containers:
description: Specify the container in detail if needed
items:
Expand Down Expand Up @@ -1252,39 +1254,64 @@ spec:
- name
type: object
type: array
exposePort:
format: int32
type: integer
image:
description: The image uri of your connector
type: string
scalerSpec:
scalingRule:
properties:
checkInterval:
format: int32
type: integer
cooldownPeriod:
format: int32
type: integer
customScaling:
properties:
checkInterval:
format: int32
type: integer
cooldownPeriod:
format: int32
type: integer
triggers:
items:
properties:
metadata:
additionalProperties:
type: string
type: object
secretRef:
type: string
type:
type: string
required:
- metadata
- type
type: object
type: array
required:
- triggers
type: object
httpScaling:
properties:
host:
type: string
pendingRequests:
format: int32
type: integer
svcType:
type: string
required:
- host
type: object
maxReplicaCount:
format: int32
type: integer
metadata:
additionalProperties:
anyOf:
- type: integer
- type: string
x-kubernetes-int-or-string: true
type: object
minReplicaCount:
format: int32
type: integer
scalerSecret:
type: string
type: object
type:
description: Type describes the pattern the image uses to obtain data
and reflects to a corresponding scaler.
secretRef:
type: string
required:
- type
- exposePort
type: object
status:
description: ConnectorStatus defines the observed state of Connector
Expand Down
25 changes: 14 additions & 11 deletions operator/config/samples/rabbitmq_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ metadata:
data:
host: YW1xcDovL3VzZXI6UEFTU1dPUkRAcmFiYml0bXEuZGVmYXVsdC5zdmMuY2x1c3Rlci5sb2NhbDo1Njcy
---
apiVersion: cloud.vance/v1alpha1
apiVersion: vance.io/v1alpha1
kind: Connector
metadata:
name: rabbitmq-connector-sample
name: rabbitmq-sample
spec:
type: rabbitmq
# TODO(user): Add fields here
exposePort: 8080
containers:
- name: rabbitmq-connector-sample
image: jeffhollan/rabbitmq-client:dev
Expand All @@ -19,11 +20,13 @@ spec:
- receive
args:
- "amqp://user:PASSWORD@rabbitmq.default.svc.cluster.local:5672"
scalerSpec:
checkInterval: 5
cooldownPeriod: 5
metadata:
secret: rabbitmq-consumer-secret
queueName: hello
mode: QueueLength
value: "50"
secretRef: rabbitmq-consumer-secret
scalingRule:
customScaling:
triggers:
- type: rabbitmq
metadata:
queueName: hello
mode: QueueLength
value: "50"
secretRef: rabbitmq-consumer-secret
11 changes: 5 additions & 6 deletions operator/config/samples/webhook_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ metadata:
name: webhook-sample
spec:
# TODO(user): Add fields here
type: http
image: tomcat:8.0.18-jre8
scalerSpec:
metadata:
exposePort: 8080
scalingRule:
minReplicaCount: 0
httpScaling:
host: myhost.com
svcPort: 8080
maxReplica: 30
pendingRequests: 20
pendingRequests: 10
27 changes: 18 additions & 9 deletions operator/controllers/connector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,34 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

logger.Error(err, "Getting connector failed")
// 返回错误信息给外部
// return error to outside
return ctrl.Result{}, err
}
// create Deployment for the connector.
if err = createOrUpdateAPIResources(ctx, r, connector, vance.DeployResType); err != nil {
return ctrl.Result{}, err
}
if connector.Spec.Scaler == nil {
logger.Info("scaler is nil")
// create a cluster svc for the connector
if err = createOrUpdateAPIResources(ctx, r, connector, vance.CLSvcResType); err != nil {
return ctrl.Result{}, err
}
if connector.Spec.ScalingRule == nil {
logger.Info("ScalingRule is nil")
} else {
logger.Info("scaler is not nil")
// build a http scaler if the type is http, otherwise build other scalers
if connector.Spec.ConnectorType == vance.Http {
if err = createOrUpdateAPIResources(ctx, r, connector, vance.SvcResType); err != nil {
return ctrl.Result{}, err
logger.Info("ScalingRule is not nil")

if connector.Spec.ScalingRule.HTTPScaling != nil {
// set service type to LB and create an ingress for it
if connector.Spec.ScalingRule.HTTPScaling.SvcType == "LB" {
if err = createOrUpdateAPIResources(ctx, r, connector, vance.LBSvcResType); err != nil {
return ctrl.Result{}, err
}
}

if err = createOrUpdateAPIResources(ctx, r, connector, vance.HttpSOResType); err != nil {
return ctrl.Result{}, err
}
} else {
} else if connector.Spec.ScalingRule.CustomScaling != nil {
if err = createOrUpdateAPIResources(ctx, r, connector, vance.SoResType); err != nil {
return ctrl.Result{}, err
}
Expand Down
Loading