Skip to content

Commit

Permalink
Support AWS signing for ElasticSearch client
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Feb 1, 2021
1 parent b22be7c commit c7d2f10
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
41 changes: 41 additions & 0 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ import (
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/olivere/elastic"
esaws "github.com/olivere/elastic/aws/v4"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -99,6 +104,22 @@ func NewV6Client(
if connectConfig.DisableHealthCheck {
clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false))
}
if connectConfig.AWSSigning.Enable {
if err := config.CheckAWSSigningConfig(connectConfig.AWSSigning); err != nil {
return nil, err
}
var signingClient *http.Client
var err error
if connectConfig.AWSSigning.EnvironmentCredential != nil {
signingClient, err = buildSigningHTTPClientFromEnvironmentCredentialV6(*connectConfig.AWSSigning.EnvironmentCredential)
} else {
signingClient, err = buildSigningHTTPClientFromStaticCredentialV6(*connectConfig.AWSSigning.StaticCredential)
}
if err != nil {
return nil, err
}
clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(signingClient))
}
client, err := elastic.NewClient(clientOptFuncs...)
if err != nil {
return nil, err
Expand All @@ -111,6 +132,26 @@ func NewV6Client(
}, nil
}

// Refer to https://github.com/olivere/elastic/blob/release-branch.v6/recipes/aws-connect-v4/main.go
func buildSigningHTTPClientFromStaticCredentialV6(credentialConfig config.AWSStaticCredential) (*http.Client, error) {
awsCredentials := credentials.NewStaticCredentials(
credentialConfig.AccessKey,
credentialConfig.SecretKey,
credentialConfig.SessionToken,
)
return esaws.NewV4SigningClient(awsCredentials, credentialConfig.Region), nil
}

func buildSigningHTTPClientFromEnvironmentCredentialV6(credentialConfig config.AWSEnvironmentCredential) (*http.Client, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(credentialConfig.Region)},
)
if err != nil {
return nil, err
}
return esaws.NewV4SigningClient(sess.Config.Credentials, credentialConfig.Region), nil
}

// root is for nested object like Attr property for search attributes.
func (c *elasticV6) PutMapping(ctx context.Context, index, root, key, valueType string) error {
body := buildPutMappingBodyV6(root, key, valueType)
Expand Down
41 changes: 41 additions & 0 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ import (
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/olivere/elastic/v7"
esaws "github.com/olivere/elastic/v7/aws/v4"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -92,6 +97,22 @@ func NewV7Client(
if connectConfig.DisableHealthCheck {
clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false))
}
if connectConfig.AWSSigning.Enable {
if err := config.CheckAWSSigningConfig(connectConfig.AWSSigning); err != nil {
return nil, err
}
var signingClient *http.Client
var err error
if connectConfig.AWSSigning.EnvironmentCredential != nil {
signingClient, err = buildSigningHTTPClientFromEnvironmentCredentialV7(*connectConfig.AWSSigning.EnvironmentCredential)
} else {
signingClient, err = buildSigningHTTPClientFromStaticCredentialV7(*connectConfig.AWSSigning.StaticCredential)
}
if err != nil {
return nil, err
}
clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(signingClient))
}
client, err := elastic.NewClient(clientOptFuncs...)
if err != nil {
return nil, err
Expand All @@ -104,6 +125,26 @@ func NewV7Client(
}, nil
}

// refer to https://github.com/olivere/elastic/blob/release-branch.v7/recipes/aws-connect-v4/main.go
func buildSigningHTTPClientFromStaticCredentialV7(credentialConfig config.AWSStaticCredential) (*http.Client, error) {
awsCredentials := credentials.NewStaticCredentials(
credentialConfig.AccessKey,
credentialConfig.SecretKey,
credentialConfig.SessionToken,
)
return esaws.NewV4SigningClient(awsCredentials, credentialConfig.Region), nil
}

func buildSigningHTTPClientFromEnvironmentCredentialV7(credentialConfig config.AWSEnvironmentCredential) (*http.Client, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(credentialConfig.Region)},
)
if err != nil {
return nil, err
}
return esaws.NewV4SigningClient(sess.Config.Credentials, credentialConfig.Region), nil
}

func (c *elasticV7) IsNotFoundError(err error) bool {
if elastic.IsNotFound(err) {
return true
Expand Down
43 changes: 43 additions & 0 deletions common/service/config/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
package config

import (
"fmt"
"net/url"

"github.com/uber/cadence/common"
)

var errAWSSigningCredential = fmt.Errorf("must provide exactly one type of credential, EnvironmentCredential or StaticCredential")

// ElasticSearchConfig for connecting to ElasticSearch
type (
ElasticSearchConfig struct {
Expand All @@ -42,6 +45,35 @@ type (
DisableSniff bool `yaml:"disableSniff"`
// optional to disable health check
DisableHealthCheck bool `yaml:"disableHealthCheck"`
// optional to use AWS signing client
// See more info https://github.com/olivere/elastic/wiki/Using-with-AWS-Elasticsearch-Service
AWSSigning AWSSigning `yaml:"awsSigning"`
}

// AWSSigning contains config to enable signing,
// Must provide either StaticCredential or EnvironmentCredential
AWSSigning struct {
Enable bool `yaml:"enable"`
StaticCredential *AWSStaticCredential `yaml:"staticCredential"`
EnvironmentCredential *AWSEnvironmentCredential `yaml:"environmentCredential"`
}

// AWSStaticCredential to create a static credentials value provider.
// SessionToken is only required for temporary security credentials retrieved via STS,
// otherwise an empty string can be passed for this parameter.
// See more in https://github.com/aws/aws-sdk-go/blob/master/aws/credentials/static_provider.go#L21
AWSStaticCredential struct {
AccessKey string `yaml:"accessKey"`
SecretKey string `yaml:"secretKey"`
Region string `yaml:"region"`
SessionToken string `yaml:"sessionToken"`
}

// AWSEnvironmentCredential will make a new Session created from SDK defaults, config files,
// environment, and user provided config files.
// See more in https://github.com/aws/aws-sdk-go/blob/3974dd034387fbc7cf09c8cd2400787ce07f3285/aws/session/session.go#L147
AWSEnvironmentCredential struct {
Region string `yaml:"region"`
}
)

Expand All @@ -58,3 +90,14 @@ func (cfg *ElasticSearchConfig) SetUsernamePassword() {
cfg.URL.User = url.UserPassword(cfg.Username, cfg.Password)
}
}

// CheckAWSSigningConfig checks if the AWSSigning configuration is valid
func CheckAWSSigningConfig(config AWSSigning) error {
if config.EnvironmentCredential == nil && config.StaticCredential == nil {
return errAWSSigningCredential
}
if config.EnvironmentCredential != nil && config.StaticCredential != nil {
return errAWSSigningCredential
}
return nil
}

0 comments on commit c7d2f10

Please sign in to comment.