Skip to content

Commit

Permalink
Metrics - Cloudwatch v2 (#668)
Browse files Browse the repository at this point in the history
* Enable using counters as the other types

* For completeness - gauge<->histogram

* Create metrics/cloudwatch2 pkg

* Address PR comments

* Comment around prebuilt zeros StatisticsSet

* Address PR comments
  • Loading branch information
nelz9999 authored and peterbourgon committed Mar 6, 2018
1 parent f81124a commit 7b0e27d
Show file tree
Hide file tree
Showing 4 changed files with 581 additions and 0 deletions.
241 changes: 241 additions & 0 deletions metrics/cloudwatch2/cloudwatch2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Package cloudwatch2 emits all data as a StatisticsSet (rather than
// a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK.
package cloudwatch2

import (
"math"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
"golang.org/x/sync/errgroup"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/internal/convert"
"github.com/go-kit/kit/metrics/internal/lv"
)

const (
maxConcurrentRequests = 20
)

// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
sem chan struct{}
namespace string
svc cloudwatchiface.CloudWatchAPI
counters *lv.Space
logger log.Logger
numConcurrentRequests int
}

// Option is a function adapter to change config of the CloudWatch struct
type Option func(*CloudWatch)

// WithLogger sets the Logger that will recieve error messages generated
// during the WriteLoop. By default, no logger is used.
func WithLogger(logger log.Logger) Option {
return func(cw *CloudWatch) {
cw.logger = logger
}
}

// WithConcurrentRequests sets the upper limit on how many
// cloudwatch.PutMetricDataRequest may be under way at any
// given time. If n is greater than 20, 20 is used. By default,
// the max is set at 10 concurrent requests.
func WithConcurrentRequests(n int) Option {
return func(cw *CloudWatch) {
if n > maxConcurrentRequests {
n = maxConcurrentRequests
}
cw.numConcurrentRequests = n
}
}

// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...Option) *CloudWatch {
cw := &CloudWatch{
namespace: namespace,
svc: svc,
counters: lv.NewSpace(),
numConcurrentRequests: 10,
logger: log.NewNopLogger(),
}

for _, optFunc := range options {
optFunc(cw)
}

cw.sem = make(chan struct{}, cw.numConcurrentRequests)

return cw
}

// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
return &Counter{
name: name,
obs: cw.counters.Observe,
}
}

// NewGauge returns an gauge. Under the covers, there is no distinctions
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
// just wraps a cloudwatch2.Counter.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
return convert.NewCounterAsGauge(cw.NewCounter(name))
}

// NewHistogram returns a histogram. Under the covers, there is no distinctions
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
// just wraps a cloudwatch2.Counter.
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
return convert.NewCounterAsHistogram(cw.NewCounter(name))
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
}
}

// Send will fire an API request to CloudWatch with the latest stats for
// all metrics. It is preferred that the WriteLoop method is used.
func (cw *CloudWatch) Send() error {
cw.mtx.RLock()
defer cw.mtx.RUnlock()
now := time.Now()

var datums []cloudwatch.MetricDatum

cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
datums = append(datums, cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(lvs...),
StatisticValues: stats(values),
Timestamp: aws.Time(now),
})
return true
})

var batches [][]cloudwatch.MetricDatum
for len(datums) > 0 {
var batch []cloudwatch.MetricDatum
lim := len(datums)
if lim > maxConcurrentRequests {
lim = maxConcurrentRequests
}
batch, datums = datums[:lim], datums[lim:]
batches = append(batches, batch)
}

var g errgroup.Group
for _, batch := range batches {
batch := batch
g.Go(func() error {
cw.sem <- struct{}{}
defer func() {
<-cw.sem
}()
req := cw.svc.PutMetricDataRequest(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: batch,
})
_, err := req.Send()
return err
})
}
return g.Wait()
}

var zero = float64(0.0)

// Just build this once to reduce construction costs whenever
// someone does a Send with no aggregated values.
var zeros = cloudwatch.StatisticSet{
Maximum: &zero,
Minimum: &zero,
Sum: &zero,
SampleCount: &zero,
}

func stats(a []float64) *cloudwatch.StatisticSet {
count := float64(len(a))
if count == 0 {
return &zeros
}

var sum float64
var min = math.MaxFloat64
var max = math.MaxFloat64 * -1
for _, f := range a {
sum += f
if f < min {
min = f
}
if f > max {
max = f
}
}

return &cloudwatch.StatisticSet{
Maximum: &max,
Minimum: &min,
Sum: &sum,
SampleCount: &count,
}
}

func makeDimensions(labelValues ...string) []cloudwatch.Dimension {
dimensions := make([]cloudwatch.Dimension, len(labelValues)/2)
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
dimensions[j] = cloudwatch.Dimension{
Name: aws.String(labelValues[i]),
Value: aws.String(labelValues[i+1]),
}
}
return dimensions
}

type observeFunc func(name string, lvs lv.LabelValues, value float64)

// Counter is a counter. Observations are forwarded to a node
// object, and aggregated per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}

// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}
147 changes: 147 additions & 0 deletions metrics/cloudwatch2/cloudwatch2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cloudwatch2

import (
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
)

func TestStats(t *testing.T) {
testCases := []struct {
name string
vals []float64
xMin float64
xMax float64
xSum float64
xCt float64
}{
{
"empty",
[]float64{},
0.0,
0.0,
0.0,
0.0,
},
{
"single",
[]float64{3.1416},
3.1416,
3.1416,
3.1416,
1.0,
},
{
"double",
[]float64{1.0, 9.0},
1.0,
9.0,
10.0,
2.0,
},
{
"multiple",
[]float64{5.0, 1.0, 9.0, 5.0},
1.0,
9.0,
20.0,
4.0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s := stats(tc.vals)
if tc.xMin != *s.Minimum {
t.Errorf("expected [%f]: %f\n", tc.xMin, *s.Minimum)
}
if tc.xMax != *s.Maximum {
t.Errorf("expected [%f]: %f\n", tc.xMax, *s.Maximum)
}
if tc.xSum != *s.Sum {
t.Errorf("expected [%f]: %f\n", tc.xSum, *s.Sum)
}
if tc.xCt != *s.SampleCount {
t.Errorf("expected [%f]: %f\n", tc.xCt, *s.SampleCount)
}
})
}
}

type mockCloudWatch struct {
cloudwatchiface.CloudWatchAPI
latestName string
latestData []cloudwatch.MetricDatum
}

func (mcw *mockCloudWatch) PutMetricDataRequest(in *cloudwatch.PutMetricDataInput) cloudwatch.PutMetricDataRequest {
mcw.latestName = *in.Namespace
mcw.latestData = in.MetricData
return cloudwatch.PutMetricDataRequest{
// To mock the V2 API, most of the functions spit
// out structs that you need to call Send() on.
// The non-intuitive thing is that to get the Send() to avoid actually
// going across the wire, you just create a dumb aws.Request with either
// aws.Request.Data defined (for succes) or with aws.Request.Error
// to simulate an Error.
Request: &aws.Request{Data: &cloudwatch.PutMetricDataOutput{}},
Input: in,
}
}

func TestSend(t *testing.T) {
ns := "example-namespace"
svc := &mockCloudWatch{}
cw := New(ns, svc)

c := cw.NewCounter("c").With("charlie", "cat")
h := cw.NewHistogram("h").With("hotel", "horse")
g := cw.NewGauge("g").With("golf", "giraffe")

c.Add(4.0)
c.Add(5.0)
c.Add(6.0)
h.Observe(3.0)
h.Observe(5.0)
h.Observe(7.0)
g.Set(2.0)
g.Set(5.0)
g.Set(8.0)

err := cw.Send()
if err != nil {
t.Fatalf("unexpected: %v\n", err)
}

if ns != svc.latestName {
t.Errorf("expected namespace %q; not %q\n", ns, svc.latestName)
}

if len(svc.latestData) != 3 {
t.Errorf("expected 3 datums: %v\n", svc.latestData)
}
for _, datum := range svc.latestData {
initial := *datum.MetricName
if len(datum.Dimensions) != 1 {
t.Errorf("expected 1 dimension: %v\n", datum)
}
if !strings.HasPrefix(*datum.Dimensions[0].Name, initial) {
t.Errorf("expected %q in Name of %v\n", initial, datum.Dimensions)
}
if !strings.HasPrefix(*datum.Dimensions[0].Value, initial) {
t.Errorf("expected %q in Value of %v\n", initial, datum.Dimensions)
}
if datum.StatisticValues == nil {
t.Errorf("expected StatisticValues in %v\n", datum)
}
if *datum.StatisticValues.Sum != 15.0 {
t.Errorf("expected 15.0 for Sum in %v\n", datum)
}
if *datum.StatisticValues.SampleCount != 3.0 {
t.Errorf("expected 3.0 for SampleCount in %v\n", datum)
}
}
}
Loading

0 comments on commit 7b0e27d

Please sign in to comment.