diff --git a/common/elasticsearch/client_v6.go b/common/elasticsearch/client_v6.go index b8c868cc135..b8c1b9474fc 100644 --- a/common/elasticsearch/client_v6.go +++ b/common/elasticsearch/client_v6.go @@ -26,10 +26,15 @@ import ( "fmt" "io" "math" + "net/http" "strconv" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" "github.com/olivere/elastic" + esaws "github.com/olivere/elastic/aws/v4" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" @@ -99,6 +104,22 @@ func NewV6Client( if connectConfig.DisableHealthCheck { clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false)) } + if connectConfig.AWSSigning.Enable { + if err := config.CheckAWSSigningConfig(connectConfig.AWSSigning); err != nil { + return nil, err + } + var signingClient *http.Client + var err error + if connectConfig.AWSSigning.EnvironmentCredential != nil { + signingClient, err = buildSigningHTTPClientFromEnvironmentCredentialV6(*connectConfig.AWSSigning.EnvironmentCredential) + } else { + signingClient, err = buildSigningHTTPClientFromStaticCredentialV6(*connectConfig.AWSSigning.StaticCredential) + } + if err != nil { + return nil, err + } + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(signingClient)) + } client, err := elastic.NewClient(clientOptFuncs...) if err != nil { return nil, err @@ -111,6 +132,26 @@ func NewV6Client( }, nil } +// Refer to https://github.com/olivere/elastic/blob/release-branch.v6/recipes/aws-connect-v4/main.go +func buildSigningHTTPClientFromStaticCredentialV6(credentialConfig config.AWSStaticCredential) (*http.Client, error) { + awsCredentials := credentials.NewStaticCredentials( + credentialConfig.AccessKey, + credentialConfig.SecretKey, + credentialConfig.SessionToken, + ) + return esaws.NewV4SigningClient(awsCredentials, credentialConfig.Region), nil +} + +func buildSigningHTTPClientFromEnvironmentCredentialV6(credentialConfig config.AWSEnvironmentCredential) (*http.Client, error) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(credentialConfig.Region)}, + ) + if err != nil { + return nil, err + } + return esaws.NewV4SigningClient(sess.Config.Credentials, credentialConfig.Region), nil +} + // root is for nested object like Attr property for search attributes. func (c *elasticV6) PutMapping(ctx context.Context, index, root, key, valueType string) error { body := buildPutMappingBodyV6(root, key, valueType) diff --git a/common/elasticsearch/client_v7.go b/common/elasticsearch/client_v7.go index 463f150df99..5806f55099c 100644 --- a/common/elasticsearch/client_v7.go +++ b/common/elasticsearch/client_v7.go @@ -26,10 +26,15 @@ import ( "fmt" "io" "math" + "net/http" "strconv" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" "github.com/olivere/elastic/v7" + esaws "github.com/olivere/elastic/v7/aws/v4" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" @@ -92,6 +97,22 @@ func NewV7Client( if connectConfig.DisableHealthCheck { clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false)) } + if connectConfig.AWSSigning.Enable { + if err := config.CheckAWSSigningConfig(connectConfig.AWSSigning); err != nil { + return nil, err + } + var signingClient *http.Client + var err error + if connectConfig.AWSSigning.EnvironmentCredential != nil { + signingClient, err = buildSigningHTTPClientFromEnvironmentCredentialV7(*connectConfig.AWSSigning.EnvironmentCredential) + } else { + signingClient, err = buildSigningHTTPClientFromStaticCredentialV7(*connectConfig.AWSSigning.StaticCredential) + } + if err != nil { + return nil, err + } + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(signingClient)) + } client, err := elastic.NewClient(clientOptFuncs...) if err != nil { return nil, err @@ -104,6 +125,26 @@ func NewV7Client( }, nil } +// refer to https://github.com/olivere/elastic/blob/release-branch.v7/recipes/aws-connect-v4/main.go +func buildSigningHTTPClientFromStaticCredentialV7(credentialConfig config.AWSStaticCredential) (*http.Client, error) { + awsCredentials := credentials.NewStaticCredentials( + credentialConfig.AccessKey, + credentialConfig.SecretKey, + credentialConfig.SessionToken, + ) + return esaws.NewV4SigningClient(awsCredentials, credentialConfig.Region), nil +} + +func buildSigningHTTPClientFromEnvironmentCredentialV7(credentialConfig config.AWSEnvironmentCredential) (*http.Client, error) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(credentialConfig.Region)}, + ) + if err != nil { + return nil, err + } + return esaws.NewV4SigningClient(sess.Config.Credentials, credentialConfig.Region), nil +} + func (c *elasticV7) IsNotFoundError(err error) bool { if elastic.IsNotFound(err) { return true diff --git a/common/service/config/elasticsearch.go b/common/service/config/elasticsearch.go index b6f608a2d86..c14b7c34903 100644 --- a/common/service/config/elasticsearch.go +++ b/common/service/config/elasticsearch.go @@ -21,11 +21,14 @@ package config import ( + "fmt" "net/url" "github.com/uber/cadence/common" ) +var errAWSSigningCredential = fmt.Errorf("must provide exactly one type of credential, EnvironmentCredential or StaticCredential") + // ElasticSearchConfig for connecting to ElasticSearch type ( ElasticSearchConfig struct { @@ -42,6 +45,35 @@ type ( DisableSniff bool `yaml:"disableSniff"` // optional to disable health check DisableHealthCheck bool `yaml:"disableHealthCheck"` + // optional to use AWS signing client + // See more info https://github.com/olivere/elastic/wiki/Using-with-AWS-Elasticsearch-Service + AWSSigning AWSSigning `yaml:"awsSigning"` + } + + // AWSSigning contains config to enable signing, + // Must provide either StaticCredential or EnvironmentCredential + AWSSigning struct { + Enable bool `yaml:"enable"` + StaticCredential *AWSStaticCredential `yaml:"staticCredential"` + EnvironmentCredential *AWSEnvironmentCredential `yaml:"environmentCredential"` + } + + // AWSStaticCredential to create a static credentials value provider. + // SessionToken is only required for temporary security credentials retrieved via STS, + // otherwise an empty string can be passed for this parameter. + // See more in https://github.com/aws/aws-sdk-go/blob/master/aws/credentials/static_provider.go#L21 + AWSStaticCredential struct { + AccessKey string `yaml:"accessKey"` + SecretKey string `yaml:"secretKey"` + Region string `yaml:"region"` + SessionToken string `yaml:"sessionToken"` + } + + // AWSEnvironmentCredential will make a new Session created from SDK defaults, config files, + // environment, and user provided config files. + // See more in https://github.com/aws/aws-sdk-go/blob/3974dd034387fbc7cf09c8cd2400787ce07f3285/aws/session/session.go#L147 + AWSEnvironmentCredential struct { + Region string `yaml:"region"` } ) @@ -58,3 +90,14 @@ func (cfg *ElasticSearchConfig) SetUsernamePassword() { cfg.URL.User = url.UserPassword(cfg.Username, cfg.Password) } } + +// CheckAWSSigningConfig checks if the AWSSigning configuration is valid +func CheckAWSSigningConfig(config AWSSigning) error { + if config.EnvironmentCredential == nil && config.StaticCredential == nil { + return errAWSSigningCredential + } + if config.EnvironmentCredential != nil && config.StaticCredential != nil { + return errAWSSigningCredential + } + return nil +}