Skip to content
This repository has been archived by the owner on May 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1 from awslabs/cross-account
Browse files Browse the repository at this point in the history
Enable cross account and region support
  • Loading branch information
rajeshkodali authored Aug 11, 2020
2 parents fef2835 + bbbb01a commit 30b075a
Show file tree
Hide file tree
Showing 14 changed files with 331 additions and 153 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ $(OUT_DIR)/adapter: $(src_deps)
docker-build: verify-apis test
cp deploy/Dockerfile $(TEMP_DIR)/Dockerfile

docker run -v $(TEMP_DIR):/build -v $(shell pwd):/go/src/github.com/awslabs/k8s-cloudwatch-adapter -e GOARCH=amd64 -e GOFLAGS="$(GOFLAGS)" -w /go/src/github.com/awslabs/k8s-cloudwatch-adapter $(GOIMAGE) /bin/bash -c "\
docker run --rm -v $(TEMP_DIR):/build -v $(shell pwd):/go/src/github.com/awslabs/k8s-cloudwatch-adapter -e GOARCH=amd64 -e GOFLAGS="$(GOFLAGS)" -w /go/src/github.com/awslabs/k8s-cloudwatch-adapter $(GOIMAGE) /bin/bash -c "\
CGO_ENABLED=0 GO111MODULE=on go build -o /build/adapter cmd/adapter/adapter.go"

docker build -t $(REGISTRY)/$(IMAGE):$(VERSION) $(TEMP_DIR)
Expand All @@ -44,7 +44,7 @@ else
endif

test:
CGO_ENABLED=0 GO111MODULE=on go test ./pkg/...
CGO_ENABLED=0 GO111MODULE=on go test -cover ./pkg/...

clean:
rm -rf ${OUT_DIR} vendor
Expand Down
32 changes: 16 additions & 16 deletions cmd/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ type CloudWatchAdapter struct {
basecmd.AdapterBase
}

func (a *CloudWatchAdapter) makeCloudWatchClient() (aws.Client, error) {
client := aws.NewCloudWatchClient()
return client, nil
func (a *CloudWatchAdapter) makeCloudWatchManager() (aws.CloudWatchManager, error) {
manager := aws.NewCloudWatchManager()
return manager, nil
}

func (a *CloudWatchAdapter) newController(metriccache *metriccache.MetricCache) (*controller.Controller, informers.SharedInformerFactory) {
func (a *CloudWatchAdapter) newController(cache *metriccache.MetricCache) (*controller.Controller, informers.SharedInformerFactory) {
clientConfig, err := a.ClientConfig()
if err != nil {
klog.Fatalf("unable to construct client config: %v", err)
Expand All @@ -42,14 +42,14 @@ func (a *CloudWatchAdapter) newController(metriccache *metriccache.MetricCache)
adapterInformerFactory := informers.NewSharedInformerFactory(adapterClientSet, time.Second*30)
handler := controller.NewHandler(
adapterInformerFactory.Metrics().V1alpha1().ExternalMetrics().Lister(),
metriccache)
cache)

controller := controller.NewController(adapterInformerFactory.Metrics().V1alpha1().ExternalMetrics(), &handler)
ctrl := controller.NewController(adapterInformerFactory.Metrics().V1alpha1().ExternalMetrics(), &handler)

return controller, adapterInformerFactory
return ctrl, adapterInformerFactory
}

func (a *CloudWatchAdapter) makeProvider(cwClient aws.Client, metriccache *metriccache.MetricCache) (provider.ExternalMetricsProvider, error) {
func (a *CloudWatchAdapter) makeProvider(cwManager aws.CloudWatchManager, cache *metriccache.MetricCache) (provider.ExternalMetricsProvider, error) {
client, err := a.DynamicClient()
if err != nil {
return nil, errors.Wrap(err, "unable to construct Kubernetes client")
Expand All @@ -60,7 +60,7 @@ func (a *CloudWatchAdapter) makeProvider(cwClient aws.Client, metriccache *metri
return nil, errors.Wrap(err, "unable to construct RESTMapper")
}

cwProvider := cwprov.NewCloudWatchProvider(client, mapper, cwClient, metriccache)
cwProvider := cwprov.NewCloudWatchProvider(client, mapper, cwManager, cache)
return cwProvider, nil
}

Expand All @@ -70,28 +70,28 @@ func main() {

// set up flags
cmd := &CloudWatchAdapter{}
cmd.Name = "cloudwatch-metrics-adapter"
cmd.Name = "k8s-cloudwatch-adapter"
cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the klog flags
cmd.Flags().Parse(os.Args)

stopCh := make(chan struct{})
defer close(stopCh)

metriccache := metriccache.NewMetricCache()
cache := metriccache.NewMetricCache()

// start and run contoller components
controller, adapterInformerFactory := cmd.newController(metriccache)
// start and run ctrl components
ctrl, adapterInformerFactory := cmd.newController(cache)
go adapterInformerFactory.Start(stopCh)
go controller.Run(2, time.Second, stopCh)
go ctrl.Run(2, time.Second, stopCh)

// create CloudWatch client
cwClient, err := cmd.makeCloudWatchClient()
cwClient, err := cmd.makeCloudWatchManager()
if err != nil {
klog.Fatalf("unable to construct CloudWatch client: %v", err)
}

// construct the provider
cwProvider, err := cmd.makeProvider(cwClient, metriccache)
cwProvider, err := cmd.makeProvider(cwClient, cache)
if err != nil {
klog.Fatalf("unable to construct CloudWatch metrics provider: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/aws/aws-sdk-go v1.33.5
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200323093244-5046ce1afe6b
github.com/pkg/errors v0.9.1
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v2 v2.2.8 // indirect
k8s.io/apimachinery v0.17.7
k8s.io/apiserver v0.17.7 // indirect
k8s.io/client-go v0.17.7
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/metrics/v1alpha1/externalmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ type MetricSeriesSpec struct {
// Name specifies the series name.
Name string `json:"name"`

// RoleARN indicate the ARN of IAM role to assume, this metric will be retrieved using this role.
RoleARN string `json:"roleArn"`

// Region specifies the region where metrics should be retrieved.
Region string `json:"region"`

// Queries specify the CloudWatch metrics query to retrieve data for this series.
Queries []MetricDataQuery `json:"queries"`
}
Expand Down
49 changes: 35 additions & 14 deletions pkg/aws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,45 @@ import (
"os"
"time"

"github.com/aws/aws-sdk-go/aws/credentials/stscreds"

"github.com/aws/aws-sdk-go/aws/endpoints"

"github.com/awslabs/k8s-cloudwatch-adapter/pkg/apis/metrics/v1alpha1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"k8s.io/klog"
)

// NewCloudWatchClient creates a new CloudWatch client.
func NewCloudWatchClient() Client {
func NewCloudWatchManager() CloudWatchManager {
return &cloudwatchManager{}
}

type cloudwatchManager struct {
}

func (c *cloudwatchManager) getClient(role, region string) *cloudwatch.CloudWatch {
// Using the Config value, create the CloudWatch client
sess := session.Must(session.NewSession())

// Using the SDK's default configuration, loading additional config
// and credentials values from the environment variables, shared
// credentials, and shared configuration files
cfg := aws.NewConfig()
cfg := aws.NewConfig().WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)

// check if roleARN is passed
if role != "" {
creds := stscreds.NewCredentials(sess, role)
cfg = cfg.WithCredentials(creds)
klog.Infof("using IAM role ARN: %s", role)
}

// check if region is set
if aws.StringValue(cfg.Region) == "" {
if region != "" {
cfg = cfg.WithRegion(region)
} else if aws.StringValue(cfg.Region) == "" {
cfg.Region = aws.String(GetLocalRegion())
}
klog.Infof("using AWS Region: %s", aws.StringValue(cfg.Region))
Expand All @@ -28,17 +52,14 @@ func NewCloudWatchClient() Client {
cfg = cfg.WithLogLevel(aws.LogDebugWithHTTPBody)
}

// Using the Config value, create the CloudWatch client
sess := session.Must(session.NewSession(cfg))
svc := cloudwatch.New(sess)
return &cloudwatchClient{client: svc}
}

type cloudwatchClient struct {
client *cloudwatch.CloudWatch
svc := cloudwatch.New(sess, cfg)
return svc
}

func (c *cloudwatchClient) QueryCloudWatch(cwQuery cloudwatch.GetMetricDataInput) ([]*cloudwatch.MetricDataResult, error) {
func (c *cloudwatchManager) QueryCloudWatch(request v1alpha1.ExternalMetric) ([]*cloudwatch.MetricDataResult, error) {
role := request.Spec.RoleARN
region := request.Spec.Region
cwQuery := toCloudWatchQuery(&request)
now := time.Now()
endTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location())
// CloudWatch metrics have latency, we will grab in a 5 minute window and extract the latest value
Expand All @@ -48,7 +69,7 @@ func (c *cloudwatchClient) QueryCloudWatch(cwQuery cloudwatch.GetMetricDataInput
cwQuery.StartTime = &startTime
cwQuery.ScanBy = aws.String("TimestampDescending")

req, resp := c.client.GetMetricDataRequest(&cwQuery)
req, resp := c.getClient(role, region).GetMetricDataRequest(&cwQuery)
req.SetContext(context.Background())

if err := req.Send(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/aws/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package aws

import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/awslabs/k8s-cloudwatch-adapter/pkg/apis/metrics/v1alpha1"
)

// Client represents a client for Amazon CloudWatch.
type Client interface {

// CloudWatchManager manages clients for Amazon CloudWatch.
type CloudWatchManager interface {
// Query sends a CloudWatch GetMetricDataInput to CloudWatch API for metric results.
QueryCloudWatch(query cloudwatch.GetMetricDataInput) ([]*cloudwatch.MetricDataResult, error)
QueryCloudWatch(request v1alpha1.ExternalMetric) ([]*cloudwatch.MetricDataResult, error)
}
50 changes: 50 additions & 0 deletions pkg/aws/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"io/ioutil"
"net/http"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/awslabs/k8s-cloudwatch-adapter/pkg/apis/metrics/v1alpha1"

"k8s.io/klog"
)

Expand All @@ -24,3 +28,49 @@ func GetLocalRegion() string {
// strip the last character from AZ to get region ID
return string(body[0 : len(body)-1])
}

func toCloudWatchQuery(externalMetric *v1alpha1.ExternalMetric) cloudwatch.GetMetricDataInput {
queries := externalMetric.Spec.Queries

cwMetricQueries := make([]*cloudwatch.MetricDataQuery, len(queries))
for i, q := range queries {
q := q
mdq := &cloudwatch.MetricDataQuery{
Id: &q.ID,
Label: &q.Label,
ReturnData: &q.ReturnData,
}

if len(q.Expression) == 0 {
dimensions := make([]*cloudwatch.Dimension, len(q.MetricStat.Metric.Dimensions))
for j := range q.MetricStat.Metric.Dimensions {
dimensions[j] = &cloudwatch.Dimension{
Name: &q.MetricStat.Metric.Dimensions[j].Name,
Value: &q.MetricStat.Metric.Dimensions[j].Value,
}
}

metric := &cloudwatch.Metric{
Dimensions: dimensions,
MetricName: &q.MetricStat.Metric.MetricName,
Namespace: &q.MetricStat.Metric.Namespace,
}

mdq.MetricStat = &cloudwatch.MetricStat{
Metric: metric,
Period: &q.MetricStat.Period,
Stat: &q.MetricStat.Stat,
Unit: aws.String(q.MetricStat.Unit),
}
} else {
mdq.Expression = &q.Expression
}

cwMetricQueries[i] = mdq
}
cwQuery := cloudwatch.GetMetricDataInput{
MetricDataQueries: cwMetricQueries,
}

return cwQuery
}
Loading

0 comments on commit 30b075a

Please sign in to comment.