Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Ranjith Gopal committed Jun 24, 2024
1 parent b2746a8 commit bd3c3cb
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Here is an overview of all new **experimental** features:
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))

### Fixes

Expand Down
118 changes: 116 additions & 2 deletions pkg/scalers/cassandra_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package scalers

import (
"context"
"errors"
"fmt"
"net"
"os"
"strconv"
"strings"

Expand All @@ -28,6 +30,10 @@ type cassandraScaler struct {
type CassandraMetadata struct {
username string
password string
enableTLS bool
cert string
key string
ca string
clusterIPAddress string
port int
consistency gocql.Consistency
Expand All @@ -39,6 +45,11 @@ type CassandraMetadata struct {
triggerIndex int
}

const (
tlsEnable = "enable"
tlsDisable = "disable"
)

// NewCassandraScaler creates a new Cassandra scaler.
func NewCassandraScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand Down Expand Up @@ -68,7 +79,8 @@ func NewCassandraScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

// parseCassandraMetadata parses the metadata and returns a CassandraMetadata or an error if the ScalerConfig is invalid.
func parseCassandraMetadata(config *scalersconfig.ScalerConfig) (*CassandraMetadata, error) {
meta := CassandraMetadata{}
meta := &CassandraMetadata{}
var err error

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
Expand Down Expand Up @@ -157,9 +169,86 @@ func parseCassandraMetadata(config *scalersconfig.ScalerConfig) (*CassandraMetad
return nil, fmt.Errorf("no password given")
}

if err = parseCassandraTLS(config, meta); err != nil {
return meta, err
}

meta.triggerIndex = config.TriggerIndex

return &meta, nil
return meta, nil
}

func createTempFile(prefix string, content string) (string, error) {
tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "cassandra")
err := os.MkdirAll(tempKrbDir, 0700)
if err != nil {
return "", fmt.Errorf(`error creating temporary directory: %s. Error: %w
Note, when running in a container a writable /tmp/kerberos emptyDir must be mounted. Refer to documentation`, tempKrbDir, err)
}

f, err := os.CreateTemp(tempKrbDir, prefix+"-*.pem")
if err != nil {
return "", err
}
defer f.Close()

_, err = f.WriteString(content)
if err != nil {
return "", err
}

return f.Name(), nil
}

func parseCassandraTLS(config *scalersconfig.ScalerConfig, meta *CassandraMetadata) error {
meta.enableTLS = false
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)
if val == tlsEnable {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
caCertGiven := config.AuthParams["ca"] != ""
if certGiven && !keyGiven {
return errors.New("no key given")
}
if keyGiven && !certGiven {
return errors.New("no cert given")
}
if !keyGiven && !certGiven {
return errors.New("no cert/key given")
}

certFilePath, err := createTempFile("cert", config.AuthParams["cert"])
if err != nil {
// handle error
return errors.New("Error creating cert file: " + err.Error())
}

keyFilePath, err := createTempFile("key", config.AuthParams["key"])
if err != nil {
// handle error
return errors.New("Error creating key file: " + err.Error())
}

meta.cert = certFilePath
meta.key = keyFilePath
meta.ca = config.AuthParams["ca"]
if !caCertGiven {
meta.ca = ""
} else {
caCertFilePath, err := createTempFile("caCert", config.AuthParams["ca"])
meta.ca = caCertFilePath
if err != nil {
// handle error
return errors.New("Error creating ca file: " + err.Error())
}
}
meta.enableTLS = true
} else if val != tlsDisable {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
}
}
return nil
}

// newCassandraSession returns a new Cassandra session for the provided CassandraMetadata.
Expand All @@ -172,6 +261,14 @@ func newCassandraSession(meta *CassandraMetadata, logger logr.Logger) (*gocql.Se
Password: meta.password,
}

if meta.enableTLS {
cluster.SslOpts = &gocql.SslOptions{
CertPath: meta.cert,
KeyPath: meta.key,
CaPath: meta.ca,
}
}

session, err := cluster.CreateSession()
if err != nil {
logger.Error(err, "found error creating session")
Expand Down Expand Up @@ -223,6 +320,23 @@ func (s *cassandraScaler) GetQueryResult(ctx context.Context) (int64, error) {

// Close closes the Cassandra session connection.
func (s *cassandraScaler) Close(_ context.Context) error {
// clean up any temporary files
if strings.TrimSpace(s.metadata.cert) != "" {
if err := os.Remove(s.metadata.cert); err != nil {
return err
}
}
if strings.TrimSpace(s.metadata.key) != "" {
if err := os.Remove(s.metadata.key); err != nil {
return err
}
}
if strings.TrimSpace(s.metadata.ca) != "" {
if err := os.Remove(s.metadata.ca); err != nil {
return err
}
}

if s.session != nil {
s.session.Close()
}
Expand Down
72 changes: 72 additions & 0 deletions pkg/scalers/cassandra_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package scalers

import (
"context"
"fmt"
"os"
"testing"

"github.com/go-logr/logr"
Expand All @@ -16,6 +18,12 @@ type parseCassandraMetadataTestData struct {
authParams map[string]string
}

type parseCassandraTLSTestData struct {
authParams map[string]string
isError bool
enableTLS bool
}

type cassandraMetricIdentifier struct {
metadataTestData *parseCassandraMetadataTestData
triggerIndex int
Expand Down Expand Up @@ -47,6 +55,17 @@ var testCassandraMetadata = []parseCassandraMetadataTestData{
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "port": "9042", "clusterIPAddress": "https://cassandra.test", "keyspace": "test_keyspace", "TriggerIndex": "0"}, false, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
}

var tlsAuthParamsTestData = []parseCassandraTLSTestData{
// success, TLS cert/key
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "password": "Y2Fzc2FuZHJhCg=="}, false, true},
// failure, TLS missing cert
{map[string]string{"tls": "enable", "key": "keey", "password": "Y2Fzc2FuZHJhCg=="}, true, false},
// failure, TLS missing key
{map[string]string{"tls": "enable", "cert": "ceert", "password": "Y2Fzc2FuZHJhCg=="}, true, false},
// failure, TLS invalid
{map[string]string{"tls": "yes", "cert": "ceert", "key": "keeey", "password": "Y2Fzc2FuZHJhCg=="}, true, false},
}

var cassandraMetricIdentifiers = []cassandraMetricIdentifier{
{&testCassandraMetadata[1], 0, "s0-cassandra-test_keyspace"},
{&testCassandraMetadata[2], 1, "s1-cassandra-test_keyspace"},
Expand Down Expand Up @@ -83,3 +102,56 @@ func TestCassandraGetMetricSpecForScaling(t *testing.T) {
}
}
}

func assertCertContents(testData parseCassandraTLSTestData, meta *CassandraMetadata, prop string) error {
if testData.authParams[prop] != "" {
var path string
switch prop {
case "cert":
path = meta.cert
case "key":
path = meta.key
}
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("expected to find '%v' file at %v", prop, path)
}
contents := string(data)
if contents != testData.authParams[prop] {
return fmt.Errorf("expected value: '%v' but got '%v'", testData.authParams[prop], contents)
}
}
return nil
}

var successMetaData = map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "TriggerIndex": "0"}

func TestParseCassandraTLS(t *testing.T) {
for _, testData := range tlsAuthParamsTestData {
meta, err := parseCassandraMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: successMetaData, AuthParams: testData.authParams})

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)
}
if meta.enableTLS {
if meta.cert != testData.authParams["cert"] {
err := assertCertContents(testData, meta, "cert")
if err != nil {
t.Errorf(err.Error())
}
}
if meta.key != testData.authParams["key"] {
err := assertCertContents(testData, meta, "key")
if err != nil {
t.Errorf(err.Error())
}
}
}
}
}

0 comments on commit bd3c3cb

Please sign in to comment.