Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics - Cloudwatch v2 #668

Merged
merged 6 commits into from
Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 249 additions & 0 deletions metrics/cloudwatch2/cloudwatch2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Package cloudwatch2 emits all data as a StatisticsSet (rather than
// a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK.
package cloudwatch2

import (
"math"
"os"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package cloudwatchiface 🤦‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah. It's supposed to be in support of mocking out their service... But it feels so icky.


"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/convert"
"github.com/go-kit/kit/metrics/internal/lv"
)

const (
maxConcurrentRequests = 20
)

// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
sem chan struct{}
namespace string
svc cloudwatchiface.CloudWatchAPI
counters *lv.Space
logger log.Logger
numConcurrentRequests int
}

type option func(*CloudWatch)
Copy link
Member

@peterbourgon peterbourgon Mar 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is returned by exported functions, it's nicer to also make it exported. (And commented, etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.


func (cw *CloudWatch) apply(opt option) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is never used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yup. That's what I get for doin' a copypasta.

if opt != nil {
opt(cw)
}
}

// WithLogger sets the Logger that will recieve error messages generated
// during the WriteLoop
func WithLogger(logger log.Logger) option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it's nice if functional options describe the default behavior if the option isn't invoked. In this case, it might be nice to add: "By default, no logger is used."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing...

return func(cw *CloudWatch) {
cw.logger = logger
}
}

// WithConcurrentRequests sets the upper limit on how many
// cloudwatch.PutMetricDataRequest may be under way at any
// given time. If n is greater than 20, 20 is used.
func WithConcurrentRequests(n int) option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly: "By default, 10 concurrent requests are used."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding...

return func(cw *CloudWatch) {
if n > maxConcurrentRequests {
n = maxConcurrentRequests
}
cw.numConcurrentRequests = n
}
}

// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
cw := &CloudWatch{
sem: nil, // set below
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this field be omitted since it's set below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

namespace: namespace,
svc: svc,
counters: lv.NewSpace(),
numConcurrentRequests: 10,
logger: log.NewLogfmtLogger(os.Stderr),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default should be NewNopLogger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

for _, optFunc := range options {
optFunc(cw)
}

cw.sem = make(chan struct{}, cw.numConcurrentRequests)

return cw
}

// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
return &Counter{
name: name,
obs: cw.counters.Observe,
}
}

// NewGauge returns an gauge. Under the covers, there is no distinctions
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
// just wraps a cloudwatch2.Counter.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
return convert.NewCounterAsGauge(cw.NewCounter(name))
}

// NewHistogram returns a histogram. Under the covers, there is no distinctions
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
// just wraps a cloudwatch2.Counter.
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
return convert.NewCounterAsHistogram(cw.NewCounter(name))
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
}
}

// Send will fire an API request to CloudWatch with the latest stats for
// all metrics. It is preferred that the WriteLoop method is used.
func (cw *CloudWatch) Send() error {
cw.mtx.RLock()
defer cw.mtx.RUnlock()
now := time.Now()

var datums []cloudwatch.MetricDatum

cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
datums = append(datums, cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(lvs...),
StatisticValues: stats(values),
Timestamp: aws.Time(now),
})
return true
})

var batches [][]cloudwatch.MetricDatum
for len(datums) > 0 {
var batch []cloudwatch.MetricDatum
lim := len(datums)
if lim > maxConcurrentRequests {
lim = maxConcurrentRequests
}
batch, datums = datums[:lim], datums[lim:]
batches = append(batches, batch)
}

var errors = make(chan error, len(batches))
for _, batch := range batches {
go func(batch []cloudwatch.MetricDatum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the goroutines could run as an errorgroup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh, cool! I hadn't yet added errorgroup to my toolbelt. Added!

cw.sem <- struct{}{}
defer func() {
<-cw.sem
}()
req := cw.svc.PutMetricDataRequest(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: batch,
})
_, err := req.Send()
errors <- err
}(batch)
}
var firstErr error
for i := 0; i < cap(errors); i++ {
if err := <-errors; err != nil && firstErr != nil {
firstErr = err
}
}

return firstErr
}

var zero = float64(0.0)
var zeros = cloudwatch.StatisticSet{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used once, any reason it should be a package var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intent was to avoid paying the "construction costs" every time a zero set is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Should I do something different to make that apparent?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just a light comment could work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing!

Maximum: &zero,
Minimum: &zero,
Sum: &zero,
SampleCount: &zero,
}

func stats(a []float64) *cloudwatch.StatisticSet {
count := float64(len(a))
if count == 0 {
return &zeros
}

var sum float64
var min = math.MaxFloat64
var max = math.MaxFloat64 * -1
for _, f := range a {
sum += f
if f < min {
min = f
}
if f > max {
max = f
}
}

return &cloudwatch.StatisticSet{
Maximum: &max,
Minimum: &min,
Sum: &sum,
SampleCount: &count,
}
}

func makeDimensions(labelValues ...string) []cloudwatch.Dimension {
dimensions := make([]cloudwatch.Dimension, len(labelValues)/2)
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha! Wild. Nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't take credit for this... I lifted this directly from the existing cloudwatch package.

dimensions[j] = cloudwatch.Dimension{
Name: aws.String(labelValues[i]),
Value: aws.String(labelValues[i+1]),
}
}
return dimensions
}

type observeFunc func(name string, lvs lv.LabelValues, value float64)

// Counter is a counter. Observations are forwarded to a node
// object, and aggregated per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}

// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}

// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}
147 changes: 147 additions & 0 deletions metrics/cloudwatch2/cloudwatch2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package cloudwatch2

import (
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
)

func TestStats(t *testing.T) {
testCases := []struct {
name string
vals []float64
xMin float64
xMax float64
xSum float64
xCt float64
}{
{
"empty",
[]float64{},
0.0,
0.0,
0.0,
0.0,
},
{
"single",
[]float64{3.1416},
3.1416,
3.1416,
3.1416,
1.0,
},
{
"double",
[]float64{1.0, 9.0},
1.0,
9.0,
10.0,
2.0,
},
{
"multiple",
[]float64{5.0, 1.0, 9.0, 5.0},
1.0,
9.0,
20.0,
4.0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s := stats(tc.vals)
if tc.xMin != *s.Minimum {
t.Errorf("expected [%f]: %f\n", tc.xMin, *s.Minimum)
}
if tc.xMax != *s.Maximum {
t.Errorf("expected [%f]: %f\n", tc.xMax, *s.Maximum)
}
if tc.xSum != *s.Sum {
t.Errorf("expected [%f]: %f\n", tc.xSum, *s.Sum)
}
if tc.xCt != *s.SampleCount {
t.Errorf("expected [%f]: %f\n", tc.xCt, *s.SampleCount)
}
})
}
}

type mockCloudWatch struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😊

cloudwatchiface.CloudWatchAPI
latestName string
latestData []cloudwatch.MetricDatum
}

func (mcw *mockCloudWatch) PutMetricDataRequest(in *cloudwatch.PutMetricDataInput) cloudwatch.PutMetricDataRequest {
mcw.latestName = *in.Namespace
mcw.latestData = in.MetricData
return cloudwatch.PutMetricDataRequest{
// To mock the V2 API, most of the functions spit
// out structs that you need to call Send() on.
// The non-intuitive thing is that to get the Send() to avoid actually
// going across the wire, you just create a dumb aws.Request with either
// aws.Request.Data defined (for succes) or with aws.Request.Error
// to simulate an Error.
Request: &aws.Request{Data: &cloudwatch.PutMetricDataOutput{}},
Input: in,
}
}

func TestSend(t *testing.T) {
ns := "example-namespace"
svc := &mockCloudWatch{}
cw := New(ns, svc)

c := cw.NewCounter("c").With("charlie", "cat")
h := cw.NewHistogram("h").With("hotel", "horse")
g := cw.NewGauge("g").With("golf", "giraffe")

c.Add(4.0)
c.Add(5.0)
c.Add(6.0)
h.Observe(3.0)
h.Observe(5.0)
h.Observe(7.0)
g.Set(2.0)
g.Set(5.0)
g.Set(8.0)

err := cw.Send()
if err != nil {
t.Fatalf("unexpected: %v\n", err)
}

if ns != svc.latestName {
t.Errorf("expected namespace %q; not %q\n", ns, svc.latestName)
}

if len(svc.latestData) != 3 {
t.Errorf("expected 3 datums: %v\n", svc.latestData)
}
for _, datum := range svc.latestData {
initial := *datum.MetricName
if len(datum.Dimensions) != 1 {
t.Errorf("expected 1 dimension: %v\n", datum)
}
if !strings.HasPrefix(*datum.Dimensions[0].Name, initial) {
t.Errorf("expected %q in Name of %v\n", initial, datum.Dimensions)
}
if !strings.HasPrefix(*datum.Dimensions[0].Value, initial) {
t.Errorf("expected %q in Value of %v\n", initial, datum.Dimensions)
}
if datum.StatisticValues == nil {
t.Errorf("expected StatisticValues in %v\n", datum)
}
if *datum.StatisticValues.Sum != 15.0 {
t.Errorf("expected 15.0 for Sum in %v\n", datum)
}
if *datum.StatisticValues.SampleCount != 3.0 {
t.Errorf("expected 3.0 for SampleCount in %v\n", datum)
}
}
}
Loading