Skip to content

Commit

Permalink
Support custom AWS endpoint (#3779)
Browse files Browse the repository at this point in the history
* Support custom AWS endpoint

Signed-off-by: Phan Duc <phan.duc@moneyforward.co.jp>

* Refactor aws sdk client creation

Signed-off-by: Phan Duc <phan.duc@moneyforward.co.jp>

* Add changelog for support aws endpoint

Signed-off-by: Phan Duc <phan.duc@moneyforward.co.jp>

* Move changelog to new section

Signed-off-by: Phan Duc <phan.duc@moneyforward.co.jp>

Signed-off-by: Phan Duc <phan.duc@moneyforward.co.jp>
  • Loading branch information
yuyuvn authored Oct 28, 2022
1 parent 83f882e commit 1d6d0f1
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 136 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- [v1.1.0](#v110)
- [v1.0.0](#v100)

## Unreleased

### New

- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588)
- **General:** Introduce new Loki Scaler ([#3699](https://github.com/kedacore/keda/issues/3699))
- **General**: Add ratelimitting parameters to keda manager to allow override of client defaults ([#3730](https://github.com/kedacore/keda/issues/2920))
- **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663))
- **AWS Scalers**: Add setting AWS endpoint url. ([#3337](https://github.com/kedacore/keda/issues/3337))
- **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
- **Azure Service Bus Scaler:** Support regex usage in queueName / subscriptionName parameters. ([#1624](https://github.com/kedacore/keda/issues/1624))
- **Selenium Grid Scaler:** Allow setting url trigger parameter from TriggerAuthentication/ClusterTriggerAuthentication ([#3752](https://github.com/kedacore/keda/pull/3752))
Expand Down
36 changes: 10 additions & 26 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-logr/logr"
Expand Down Expand Up @@ -52,7 +49,8 @@ type awsCloudwatchMetadata struct {
metricStatPeriod int64
metricEndTimeOffset int64

awsRegion string
awsRegion string
awsEndpoint string

awsAuthorization awsAuthorizationMetadata

Expand Down Expand Up @@ -112,29 +110,11 @@ func getFloatMetadataValue(metadata map[string]string, key string, required bool
}

func createCloudwatchClient(metadata *awsCloudwatchMetadata) *cloudwatch.CloudWatch {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
}))
sess, config := getAwsConfig(metadata.awsRegion,
metadata.awsEndpoint,
metadata.awsAuthorization)

var cloudwatchClient *cloudwatch.CloudWatch
if metadata.awsAuthorization.podIdentityOwner {
creds := credentials.NewStaticCredentials(metadata.awsAuthorization.awsAccessKeyID, metadata.awsAuthorization.awsSecretAccessKey, metadata.awsAuthorization.awsSessionToken)

if metadata.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, metadata.awsAuthorization.awsRoleArn)
}

cloudwatchClient = cloudwatch.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Credentials: creds,
})
} else {
cloudwatchClient = cloudwatch.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
})
}

return cloudwatchClient
return cloudwatch.New(sess, config)
}

func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, error) {
Expand Down Expand Up @@ -234,6 +214,10 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

meta.awsAuthorization, err = getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/scalers/aws_cloudwatch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
"metricCollectionTime": "300",
"metricStat": "Average",
"metricStatPeriod": "300",
"awsRegion": "eu-west-1"},
"awsRegion": "eu-west-1",
"awsEndpoint": "http://localhost:4566"},
testAWSAuthentication, false,
"Properly formed cloudwatch query with optional parameters"},
// properly formed cloudwatch query but Region is empty
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package scalers

import "fmt"
import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
)

type awsAuthorizationMetadata struct {
awsRoleArn string
Expand All @@ -12,6 +19,43 @@ type awsAuthorizationMetadata struct {
podIdentityOwner bool
}

type awsConfigMetadata struct {
awsRegion string
awsEndpoint string
awsAuthorization awsAuthorizationMetadata
}

func getAwsConfig(awsRegion string, awsEndpoint string, awsAuthorization awsAuthorizationMetadata) (*session.Session, *aws.Config) {
metadata := &awsConfigMetadata{
awsRegion: awsRegion,
awsEndpoint: awsEndpoint,
awsAuthorization: awsAuthorization}

sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
}))

if !metadata.awsAuthorization.podIdentityOwner {
return sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
}
}

creds := credentials.NewStaticCredentials(metadata.awsAuthorization.awsAccessKeyID, metadata.awsAuthorization.awsSecretAccessKey, "")

if metadata.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, metadata.awsAuthorization.awsRoleArn)
}

return sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
Credentials: creds,
}
}

func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (awsAuthorizationMetadata, error) {
meta := awsAuthorizationMetadata{}

Expand Down
39 changes: 10 additions & 29 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/go-logr/logr"
Expand All @@ -32,6 +29,7 @@ type awsDynamoDBScaler struct {
type awsDynamoDBMetadata struct {
tableName string
awsRegion string
awsEndpoint string
keyConditionExpression string
expressionAttributeNames map[string]*string
expressionAttributeValues map[string]*dynamodb.AttributeValue
Expand Down Expand Up @@ -76,6 +74,10 @@ func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

if val, ok := config.TriggerMetadata["keyConditionExpression"]; ok && val != "" {
meta.keyConditionExpression = val
} else {
Expand Down Expand Up @@ -142,33 +144,12 @@ func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error
return &meta, nil
}

func createDynamoDBClient(meta *awsDynamoDBMetadata) *dynamodb.DynamoDB {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(meta.awsRegion),
}))

var dbClient *dynamodb.DynamoDB

if !meta.awsAuthorization.podIdentityOwner {
dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(meta.awsRegion),
})

return dbClient
}

creds := credentials.NewStaticCredentials(meta.awsAuthorization.awsAccessKeyID, meta.awsAuthorization.awsSecretAccessKey, "")

if meta.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, meta.awsAuthorization.awsRoleArn)
}

dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(meta.awsRegion),
Credentials: creds,
})
func createDynamoDBClient(metadata *awsDynamoDBMetadata) *dynamodb.DynamoDB {
sess, config := getAwsConfig(metadata.awsRegion,
metadata.awsEndpoint,
metadata.awsAuthorization)

return dbClient
return dynamodb.New(sess, config)
}

func (s *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
Expand Down
30 changes: 30 additions & 0 deletions pkg/scalers/aws_dynamodb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,36 @@ var dynamoTestCases = []parseDynamoDBMetadataTestData{
},
},
},
{
name: "properly formed dynamo name and region with custom endpoint",
metadata: map[string]string{
"tableName": "test",
"awsRegion": "eu-west-1",
"awsEndpoint": "http://localhost:4566",
"keyConditionExpression": "#yr = :yyyy",
"expressionAttributeNames": "{ \"#yr\" : \"year\" }",
"expressionAttributeValues": "{\":yyyy\": {\"N\": \"1994\"}}",
"targetValue": "3",
},
authParams: testAWSDynamoAuthentication,
expectedError: nil,
expectedMetadata: &awsDynamoDBMetadata{
tableName: "test",
awsRegion: "eu-west-1",
awsEndpoint: "http://localhost:4566",
keyConditionExpression: "#yr = :yyyy",
expressionAttributeNames: map[string]*string{"#yr": &year},
expressionAttributeValues: map[string]*dynamodb.AttributeValue{":yyyy": &yearAttr},
targetValue: 3,
scalerIndex: 1,
metricName: "s1-aws-dynamodb-test",
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: "none",
awsSecretAccessKey: "none",
podIdentityOwner: true,
},
},
},
}

func TestParseDynamoMetadata(t *testing.T) {
Expand Down
39 changes: 11 additions & 28 deletions pkg/scalers/aws_dynamodb_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
Expand Down Expand Up @@ -41,6 +37,7 @@ type awsDynamoDBStreamsMetadata struct {
activationTargetShardCount int64
tableName string
awsRegion string
awsEndpoint string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
}
Expand Down Expand Up @@ -85,6 +82,10 @@ func parseAwsDynamoDBStreamsMetadata(config *ScalerConfig, logger logr.Logger) (
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" {
meta.tableName = val
} else {
Expand Down Expand Up @@ -122,34 +123,16 @@ func parseAwsDynamoDBStreamsMetadata(config *ScalerConfig, logger logr.Logger) (
}

func createClientsForDynamoDBStreamsScaler(metadata *awsDynamoDBStreamsMetadata) (*dynamodb.DynamoDB, *dynamodbstreams.DynamoDBStreams) {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
}))
sess, config := getAwsConfig(metadata.awsRegion,
metadata.awsEndpoint,
metadata.awsAuthorization)

var dbClient *dynamodb.DynamoDB
var dbStreamClient *dynamodbstreams.DynamoDBStreams

if metadata.awsAuthorization.podIdentityOwner {
creds := credentials.NewStaticCredentials(metadata.awsAuthorization.awsAccessKeyID, metadata.awsAuthorization.awsSecretAccessKey, metadata.awsAuthorization.awsSessionToken)
if metadata.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, metadata.awsAuthorization.awsRoleArn)
}
dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Credentials: creds,
})
dbStreamClient = dynamodbstreams.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Credentials: creds,
})
} else {
dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
})
dbStreamClient = dynamodbstreams.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
})
}
dbClient = dynamodb.New(sess, config)
dbStreamClient = dynamodbstreams.New(sess, config)

return dbClient, dbStreamClient
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/scalers/aws_dynamodb_streams_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
testAWSDynamoDBStreamsSecretAccessKey = "none"
testAWSDynamoDBStreamsSessionToken = "none"
testAWSDynamoDBStreamsRegion = "ap-northeast-1"
testAWSDynamoDBStreamsEndpoint = "http://localhost:4566"
testAWSDynamoDBStreamsArnForSmallTable = "smallstreamarn"
testAWSDynamoDBStreamsArnForBigTable = "bigstreamarn"
testAWSDynamoDBStreamsErrorArn = "errorarn"
Expand Down Expand Up @@ -150,6 +151,31 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
comment: "properly formed dynamodb table name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"tableName": testAWSDynamoDBSmallTable,
"shardCount": "2",
"activationShardCount": "1",
"awsRegion": testAWSDynamoDBStreamsRegion,
"awsEndpoint": testAWSDynamoDBStreamsEndpoint},
authParams: testAWSKinesisAuthentication,
expected: &awsDynamoDBStreamsMetadata{
targetShardCount: 2,
activationTargetShardCount: 1,
tableName: testAWSDynamoDBSmallTable,
awsRegion: testAWSDynamoDBStreamsRegion,
awsEndpoint: testAWSDynamoDBStreamsEndpoint,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID,
awsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 0,
},
isError: false,
comment: "properly formed dynamodb table name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"tableName": "",
Expand Down
Loading

0 comments on commit 1d6d0f1

Please sign in to comment.