Skip to content

Commit

Permalink
Refactor couchdb scaler (#6267)
Browse files Browse the repository at this point in the history
* Refactor couchdb scaler

Signed-off-by: rickbrouwer <rickbrouwer@gmail.com>

* Update

Signed-off-by: rickbrouwer <rickbrouwer@gmail.com>

---------

Signed-off-by: rickbrouwer <rickbrouwer@gmail.com>
  • Loading branch information
rickbrouwer authored Nov 3, 2024
1 parent a825451 commit baec715
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 171 deletions.
263 changes: 107 additions & 156 deletions pkg/scalers/couchdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"net"
"strconv"

couchdb "github.com/go-kivik/couchdb/v3"
"github.com/go-kivik/kivik/v3"
Expand All @@ -19,216 +18,168 @@ import (

type couchDBScaler struct {
metricType v2.MetricTargetType
metadata *couchDBMetadata
metadata couchDBMetadata
client *kivik.Client
logger logr.Logger
}

type couchDBMetadata struct {
ConnectionString string `keda:"name=connectionString,order=authParams;triggerMetadata;resolvedEnv,optional"`
Host string `keda:"name=host,order=authParams;triggerMetadata,optional"`
Port string `keda:"name=port,order=authParams;triggerMetadata,optional"`
Username string `keda:"name=username,order=authParams;triggerMetadata,optional"`
Password string `keda:"name=password,order=authParams;triggerMetadata;resolvedEnv,optional"`
DBName string `keda:"name=dbName,order=authParams;triggerMetadata,optional"`
Query string `keda:"name=query,order=triggerMetadata,optional"`
QueryValue int64 `keda:"name=queryValue,order=triggerMetadata,optional"`
ActivationQueryValue int64 `keda:"name=activationQueryValue,order=triggerMetadata,default=0,optional"`
TriggerIndex int
}

func (m *couchDBMetadata) Validate() error {
if m.ConnectionString == "" {
if m.Host == "" {
return fmt.Errorf("no host given")
}
if m.Port == "" {
return fmt.Errorf("no port given")
}
if m.Username == "" {
return fmt.Errorf("no username given")
}
if m.Password == "" {
return fmt.Errorf("no password given")
}
if m.DBName == "" {
return fmt.Errorf("no dbName given")
}
}
return nil
}

type couchDBQueryRequest struct {
Selector map[string]interface{} `json:"selector"`
Fields []string `json:"fields"`
}

type couchDBMetadata struct {
connectionString string
host string
port string
username string
password string
dbName string
query string
queryValue int64
activationQueryValue int64
triggerIndex int
}

type Res struct {
ID string `json:"_id"`
Feet int `json:"feet"`
Greeting string `json:"greeting"`
}

func (s *couchDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("coucdb-%s", s.metadata.dbName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.queryValue),
func NewCouchDBScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,

meta, err := parseCouchDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing couchdb metadata: %w", err)
}
return []v2.MetricSpec{metricSpec}
}

func (s couchDBScaler) Close(ctx context.Context) error {
if s.client != nil {
err := s.client.Close(ctx)
if err != nil {
s.logger.Error(err, fmt.Sprintf("failed to close couchdb connection, because of %v", err))
return err
}
connStr := meta.ConnectionString
if connStr == "" {
addr := net.JoinHostPort(meta.Host, meta.Port)
connStr = "http://" + addr
}
return nil
}

func (s *couchDBScaler) getQueryResult(ctx context.Context) (int64, error) {
db := s.client.DB(ctx, s.metadata.dbName)
var request couchDBQueryRequest
err := json.Unmarshal([]byte(s.metadata.query), &request)
client, err := kivik.New("couch", connStr)
if err != nil {
s.logger.Error(err, fmt.Sprintf("Couldn't unmarshal query string because of %v", err))
return 0, err
return nil, fmt.Errorf("error creating couchdb client: %w", err)
}
rows, err := db.Find(ctx, request, nil)

err = client.Authenticate(ctx, couchdb.BasicAuth("admin", meta.Password))
if err != nil {
s.logger.Error(err, fmt.Sprintf("failed to fetch rows because of %v", err))
return 0, err
return nil, fmt.Errorf("error authenticating with couchdb: %w", err)
}
var count int64
for rows.Next() {
count++
res := Res{}
if err := rows.ScanDoc(&res); err != nil {
s.logger.Error(err, fmt.Sprintf("failed to scan the doc because of %v", err))
return 0, err
}

isConnected, err := client.Ping(ctx)
if !isConnected || err != nil {
return nil, fmt.Errorf("failed to ping couchdb: %w", err)
}
return count, nil

return &couchDBScaler{
metricType: metricType,
metadata: meta,
client: client,
logger: InitializeLogger(config, "couchdb_scaler"),
}, nil
}

func parseCouchDBMetadata(config *scalersconfig.ScalerConfig) (*couchDBMetadata, string, error) {
var connStr string
var err error
func parseCouchDBMetadata(config *scalersconfig.ScalerConfig) (couchDBMetadata, error) {
meta := couchDBMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, "", fmt.Errorf("no query given")
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing couchdb metadata: %w", err)
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int, because of %w", val, err)
}
meta.queryValue = queryValue
} else {
if config.AsMetricSource {
meta.queryValue = 0
} else {
return nil, "", fmt.Errorf("no queryValue given")
}
if meta.QueryValue == 0 && !config.AsMetricSource {
return meta, fmt.Errorf("no queryValue given")
}

meta.activationQueryValue = 0
if val, ok := config.TriggerMetadata["activationQueryValue"]; ok {
activationQueryValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int, because of %w", val, err)
}
meta.activationQueryValue = activationQueryValue
if config.AsMetricSource {
meta.QueryValue = 0
}

dbName, err := GetFromAuthOrMeta(config, "dbName")
if err != nil {
return nil, "", err
}
meta.dbName = dbName

switch {
case config.AuthParams["connectionString"] != "":
meta.connectionString = config.AuthParams["connectionString"]
case config.TriggerMetadata["connectionStringFromEnv"] != "":
meta.connectionString = config.ResolvedEnv[config.TriggerMetadata["connectionStringFromEnv"]]
default:
meta.connectionString = ""
host, err := GetFromAuthOrMeta(config, "host")
if err != nil {
return nil, "", err
}
meta.host = host

port, err := GetFromAuthOrMeta(config, "port")
if err != nil {
return nil, "", err
}
meta.port = port

username, err := GetFromAuthOrMeta(config, "username")
if err != nil {
return nil, "", err
}
meta.username = username
meta.TriggerIndex = config.TriggerIndex
return meta, nil
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}
if len(meta.password) == 0 {
return nil, "", fmt.Errorf("no password given")
func (s *couchDBScaler) Close(ctx context.Context) error {
if s.client != nil {
if err := s.client.Close(ctx); err != nil {
s.logger.Error(err, "failed to close couchdb connection")
return err
}
}

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
// Build connection str
addr := net.JoinHostPort(meta.host, meta.port)
// nosemgrep: db-connection-string
connStr = "http://" + addr
}
meta.triggerIndex = config.TriggerIndex
return &meta, connStr, nil
return nil
}

func NewCouchDBScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
func (s *couchDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("coucdb-%s", s.metadata.DBName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.QueryValue),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

meta, connStr, err := parseCouchDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("failed to parsing couchDB metadata, because of %w", err)
}
func (s *couchDBScaler) getQueryResult(ctx context.Context) (int64, error) {
db := s.client.DB(ctx, s.metadata.DBName)

client, err := kivik.New("couch", connStr)
if err != nil {
return nil, fmt.Errorf("%w", err)
var request couchDBQueryRequest
if err := json.Unmarshal([]byte(s.metadata.Query), &request); err != nil {
return 0, fmt.Errorf("error unmarshaling query: %w", err)
}

err = client.Authenticate(ctx, couchdb.BasicAuth("admin", meta.password))
rows, err := db.Find(ctx, request, nil)
if err != nil {
return nil, err
return 0, fmt.Errorf("error executing query: %w", err)
}

isconnected, err := client.Ping(ctx)
if !isconnected {
return nil, fmt.Errorf("%w", err)
}
if err != nil {
return nil, fmt.Errorf("failed to ping couchDB, because of %w", err)
var count int64
for rows.Next() {
count++
var res Res
if err := rows.ScanDoc(&res); err != nil {
return 0, fmt.Errorf("error scanning document: %w", err)
}
}

return &couchDBScaler{
metricType: metricType,
metadata: meta,
client: client,
logger: InitializeLogger(config, "couchdb_scaler"),
}, nil
return count, nil
}

// GetMetricsAndActivity query from couchDB,and return to external metrics and activity
func (s *couchDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
result, err := s.getQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("failed to inspect couchdb, because of %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("failed to inspect couchdb: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(result))

return append([]external_metrics.ExternalMetricValue{}, metric), result > s.metadata.activationQueryValue, nil
return []external_metrics.ExternalMetricValue{metric}, result > s.metadata.ActivationQueryValue, nil
}
Loading

0 comments on commit baec715

Please sign in to comment.