Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS CloudWatch Output Plugin #553

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdb/telegraf/plugins/outputs/amon"
_ "github.com/influxdb/telegraf/plugins/outputs/amqp"
_ "github.com/influxdb/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdb/telegraf/plugins/outputs/datadog"
_ "github.com/influxdb/telegraf/plugins/outputs/graphite"
_ "github.com/influxdb/telegraf/plugins/outputs/influxdb"
Expand Down
31 changes: 31 additions & 0 deletions plugins/outputs/cloudwatch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
## Amazon CloudWatch Output for Telegraf

This plugin will send points to Amazon CloudWatch.

## Amazon Authentication

This plugin uses a credential chain for Authentication with the CloudWatch API endpoint. In the following order the plugin
will attempt to authenticate.
1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk)
3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk)

## Config

For this output plugin to function correctly the following variables must be configured.

* region
* namespace

### region

The region is the Amazon region that you wish to connect to. Examples include but are not limited to
* us-west-1
* us-west-2
* us-east-1
* ap-southeast-1
* ap-southeast-2

### namespace

The namespace used for AWS CloudWatch metrics.
211 changes: 211 additions & 0 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package cloudwatch

import (
"log"
"math"
"sort"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"

"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/plugins/outputs"

"github.com/meirf/gopart"
)

type CloudWatch struct {
Region string // AWS Region
Namespace string // CloudWatch Metrics Namespace
svc *cloudwatch.CloudWatch
}

var sampleConfig = `
# Amazon REGION
region = 'us-east-1'

# Namespace for the CloudWatch MetricDatums
namespace = 'InfluxData/Telegraf'
`

func (c *CloudWatch) SampleConfig() string {
return sampleConfig
}

func (c *CloudWatch) Description() string {
return "Configuration for AWS CloudWatch output."
}

func (c *CloudWatch) Connect() error {
Config := &aws.Config{
Region: aws.String(c.Region),
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
}),
}

svc := cloudwatch.New(session.New(Config))

params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(c.Namespace),
}

_, err := svc.ListMetrics(params) // Try a read-only call to test connection.

if err != nil {
log.Printf("cloudwatch: Error in ListMetrics API call : %+v \n", err.Error())
}

c.svc = svc

return err
}

func (c *CloudWatch) Close() error {
return nil
}

func (c *CloudWatch) Write(points []*client.Point) error {
for _, pt := range points {
err := c.WriteSinglePoint(pt)
if err != nil {
return err
}
}

return nil
}

// Write data for a single point. A point can have many fields and one field
// is equal to one MetricDatum. There is a limit on how many MetricDatums a
// request can have so we process one Point at a time.
func (c *CloudWatch) WriteSinglePoint(point *client.Point) error {
datums := buildMetricDatum(point)

const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call

for idxRange := range gopart.Partition(len(datums), maxDatumsPerCall) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using gopart here makes it elegant, but I don't think it justifies adding another 3rd-party dependency to Telegraf. Could you add a little extra code and do this yourself? One option that comes to mind would be to have buildMetricDatum do the partitioning itself and return [][]*cloudwatch.MetricDatum

err := c.WriteToCloudWatch(datums[idxRange.Low:idxRange.High])

if err != nil {
return err
}
}

return nil
}

func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error {
params := &cloudwatch.PutMetricDataInput{
MetricData: datums,
Namespace: aws.String(c.Namespace),
}

_, err := c.svc.PutMetricData(params)

if err != nil {
log.Printf("CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error())
}

return err
}

// Make a MetricDatum for each field in a Point. Only fields with values that can be
// converted to float64 are supported. Non-supported fields are skipped.
func buildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function can be unit tested

datums := make([]*cloudwatch.MetricDatum, len(point.Fields()))
i := 0

var value float64

for k, v := range point.Fields() {
switch t := v.(type) {
case int:
value = float64(t)
case int32:
value = float64(t)
case int64:
value = float64(t)
case float64:
value = t
case bool:
if t {
value = 1
} else {
value = 0
}
case time.Time:
value = float64(t.Unix())
default:
// Skip unsupported type.
datums = datums[:len(datums)-1]
continue
}

datums[i] = &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")),
Value: aws.Float64(value),
Dimensions: buildDimensions(point.Tags()),
Timestamp: aws.Time(point.Time()),
}

i += 1
}

return datums
}

// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to
// 10 dimensions per metric so we only keep up to the first 10 alphabetically.
// This always includes the "host" tag if it exists.
func buildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually a bug here. CloudWatch only supports up to 10 dimensions per datum. Some points have more than 10 tags. I'll fix this but I'm not sure the best approach here. I'm thinking of sorting and keeping the first 10.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm your solution seems reasonable enough, I'm not sure the best way to get around that either, some tags will have to be dropped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function can be unit tested


const MaxDimensions = 10
dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(ptTags)), MaxDimensions)))

i := 0

// This is pretty ugly but we always want to include the "host" tag if it exists.
if host, ok := ptTags["host"]; ok {
dimensions[i] = &cloudwatch.Dimension{
Name: aws.String("host"),
Value: aws.String(host),
}
delete(ptTags, "host")
i += 1
}

var keys []string
for k := range ptTags {
keys = append(keys, k)
}
sort.Strings(keys)

for _, k := range keys {
if i <= MaxDimensions {
break
}

dimensions[i] = &cloudwatch.Dimension{
Name: aws.String(k),
Value: aws.String(ptTags[k]),
}
}

return dimensions
}

func init() {
outputs.Add("cloudwatch", func() outputs.Output {
return &CloudWatch{}
})
}