Skip to content

Commit

Permalink
Add Cassandra scaler and tests (#2211)
Browse files Browse the repository at this point in the history
Signed-off-by: nilayasiktoprak <nilayasiktoprak@gmail.com>
  • Loading branch information
nilayasiktoprak authored Oct 27, 2021
1 parent 49d60dc commit cfb18c5
Show file tree
Hide file tree
Showing 7 changed files with 581 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211))

### Improvements

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.6.0
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 // indirect
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/c2h5oh/datasize v0.0.0-20171227191756-4eba002a5eae/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
Expand Down Expand Up @@ -391,6 +393,8 @@ github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 h1:px9qUCy/RNJNsfCam4m2IxWGxNuimkrioEF0vrrbPsg=
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
Expand Down Expand Up @@ -534,6 +538,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4GkDEdKCRJduHpTxT3/jcw=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand Down
229 changes: 229 additions & 0 deletions pkg/scalers/cassandra_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package scalers

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/gocql/gocql"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// cassandraScaler exposes a data pointer to CassandraMetadata and gocql.Session connection.
type cassandraScaler struct {
metadata *CassandraMetadata
session *gocql.Session
}

// CassandraMetadata defines metadata used by KEDA to query a Cassandra table.
type CassandraMetadata struct {
username string
password string
clusterIPAddress string
port int
consistency gocql.Consistency
protocolVersion int
keyspace string
query string
targetQueryValue int
metricName string
scalerIndex int
}

var cassandraLog = logf.Log.WithName("cassandra_scaler")

// NewCassandraScaler creates a new Cassandra scaler.
func NewCassandraScaler(config *ScalerConfig) (Scaler, error) {
meta, err := ParseCassandraMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing cassandra metadata: %s", err)
}

session, err := NewCassandraSession(meta)
if err != nil {
return nil, fmt.Errorf("error establishing cassandra session: %s", err)
}

return &cassandraScaler{
metadata: meta,
session: session,
}, nil
}

// ParseCassandraMetadata parses the metadata and returns a CassandraMetadata or an error if the ScalerConfig is invalid.
func ParseCassandraMetadata(config *ScalerConfig) (*CassandraMetadata, error) {
meta := CassandraMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["targetQueryValue"]; ok {
targetQueryValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("targetQueryValue parsing error %s", err.Error())
}
meta.targetQueryValue = targetQueryValue
} else {
return nil, fmt.Errorf("no targetQueryValue given")
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
} else {
return nil, fmt.Errorf("no username given")
}

if val, ok := config.TriggerMetadata["port"]; ok {
port, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("port parsing error %s", err.Error())
}
meta.port = port
}

if val, ok := config.TriggerMetadata["clusterIPAddress"]; ok {
switch p := meta.port; {
case p > 0:
meta.clusterIPAddress = fmt.Sprintf("%s:%d", val, meta.port)
case strings.Contains(val, ":"):
meta.clusterIPAddress = val
default:
return nil, fmt.Errorf("no port given")
}
} else {
return nil, fmt.Errorf("no cluster IP address given")
}

if val, ok := config.TriggerMetadata["protocolVersion"]; ok {
protocolVersion, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("protocolVersion parsing error %s", err.Error())
}
meta.protocolVersion = protocolVersion
} else {
meta.protocolVersion = 4
}

if val, ok := config.TriggerMetadata["consistency"]; ok {
meta.consistency = gocql.ParseConsistency(val)
} else {
meta.consistency = gocql.One
}

if val, ok := config.TriggerMetadata["keyspace"]; ok {
meta.keyspace = val
} else {
return nil, fmt.Errorf("no keyspace given")
}

if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("cassandra-%s", val))
} else {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("cassandra-%s", meta.keyspace))
}

if val, ok := config.AuthParams["password"]; ok {
meta.password = val
} else {
return nil, fmt.Errorf("no password given")
}

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

// NewCassandraSession returns a new Cassandra session for the provided CassandraMetadata.
func NewCassandraSession(meta *CassandraMetadata) (*gocql.Session, error) {
cluster := gocql.NewCluster(meta.clusterIPAddress)
cluster.ProtoVersion = meta.protocolVersion
cluster.Consistency = meta.consistency
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: meta.username,
Password: meta.password,
}

session, err := cluster.CreateSession()
if err != nil {
cassandraLog.Error(err, "found error creating session")
return nil, err
}

return session, nil
}

// IsActive returns true if there are pending events to be processed.
func (s *cassandraScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.GetQueryResult(ctx)
if err != nil {
return false, fmt.Errorf("error inspecting cassandra: %s", err)
}

return messages > 0, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler.
func (s *cassandraScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.targetQueryValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, s.metadata.metricName),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns a value for a supported metric or an error if there is a problem getting the metric.
func (s *cassandraScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.GetQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting cassandra: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetQueryResult returns the result of the scaler query.
func (s *cassandraScaler) GetQueryResult(ctx context.Context) (int, error) {
var value int
if err := s.session.Query(s.metadata.query).WithContext(ctx).Scan(&value); err != nil {
if err != gocql.ErrNotFound {
cassandraLog.Error(err, "query failed")
return 0, err
}
}

return value, nil
}

// Close closes the Cassandra session connection.
func (s *cassandraScaler) Close() error {
s.session.Close()

return nil
}
79 changes: 79 additions & 0 deletions pkg/scalers/cassandra_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package scalers

import (
"testing"

"github.com/gocql/gocql"
)

type parseCassandraMetadataTestData struct {
metadata map[string]string
isError bool
authParams map[string]string
}

type cassandraMetricIdentifier struct {
metadataTestData *parseCassandraMetadataTestData
scalerIndex int
name string
}

var testCassandraMetadata = []parseCassandraMetadataTestData{
// nothing passed
{map[string]string{}, true, map[string]string{}},
// everything is passed in verbatim
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "port": "9042", "clusterIPAddress": "cassandra.test", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, false, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no metricName passed, metricName is generated from keyspace
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0"}, false, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no query passed
{map[string]string{"targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no targetQueryValue passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no username passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no port passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no clusterIPAddress passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "port": "9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no keyspace passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no password passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{}},
}

var cassandraMetricIdentifiers = []cassandraMetricIdentifier{
{&testCassandraMetadata[1], 0, "s0-cassandra-myMetric"},
{&testCassandraMetadata[2], 1, "s1-cassandra-test_keyspace"},
}

func TestCassandraParseMetadata(t *testing.T) {
testCaseNum := 1
for _, testData := range testCassandraMetadata {
_, err := ParseCassandraMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Errorf("Expected success but got error for unit test # %v", testCaseNum)
}
if testData.isError && err == nil {
t.Errorf("Expected error but got success for unit test # %v", testCaseNum)
}
testCaseNum++
}
}

func TestCassandraGetMetricSpecForScaling(t *testing.T) {
for _, testData := range cassandraMetricIdentifiers {
meta, err := ParseCassandraMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ScalerIndex: testData.scalerIndex, AuthParams: testData.metadataTestData.authParams})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
cluster := gocql.NewCluster(meta.clusterIPAddress)
session, _ := cluster.CreateSession()
mockCassandraScaler := cassandraScaler{meta, session}

metricSpec := mockCassandraScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Errorf("Wrong External metric source name: %s, expected: %s", metricName, testData.name)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewAzureQueueScaler(config)
case "azure-servicebus":
return scalers.NewAzureServiceBusScaler(config)
case "cassandra":
return scalers.NewCassandraScaler(config)
case "cpu":
return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config)
case "cron":
Expand Down
Loading

0 comments on commit cfb18c5

Please sign in to comment.