-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics - Cloudwatch v2 #668
Changes from all commits
7702703
ebc199f
f261280
7ba11af
9c0b674
904b106
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
// Package cloudwatch2 emits all data as a StatisticsSet (rather than | ||
// a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK. | ||
package cloudwatch2 | ||
|
||
import ( | ||
"math" | ||
"sync" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" | ||
"golang.org/x/sync/errgroup" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/metrics" | ||
"github.com/go-kit/kit/metrics/internal/convert" | ||
"github.com/go-kit/kit/metrics/internal/lv" | ||
) | ||
|
||
const ( | ||
maxConcurrentRequests = 20 | ||
) | ||
|
||
// CloudWatch receives metrics observations and forwards them to CloudWatch. | ||
// Create a CloudWatch object, use it to create metrics, and pass those metrics as | ||
// dependencies to the components that will use them. | ||
// | ||
// To regularly report metrics to CloudWatch, use the WriteLoop helper method. | ||
type CloudWatch struct { | ||
mtx sync.RWMutex | ||
sem chan struct{} | ||
namespace string | ||
svc cloudwatchiface.CloudWatchAPI | ||
counters *lv.Space | ||
logger log.Logger | ||
numConcurrentRequests int | ||
} | ||
|
||
// Option is a function adapter to change config of the CloudWatch struct | ||
type Option func(*CloudWatch) | ||
|
||
// WithLogger sets the Logger that will recieve error messages generated | ||
// during the WriteLoop. By default, no logger is used. | ||
func WithLogger(logger log.Logger) Option { | ||
return func(cw *CloudWatch) { | ||
cw.logger = logger | ||
} | ||
} | ||
|
||
// WithConcurrentRequests sets the upper limit on how many | ||
// cloudwatch.PutMetricDataRequest may be under way at any | ||
// given time. If n is greater than 20, 20 is used. By default, | ||
// the max is set at 10 concurrent requests. | ||
func WithConcurrentRequests(n int) Option { | ||
return func(cw *CloudWatch) { | ||
if n > maxConcurrentRequests { | ||
n = maxConcurrentRequests | ||
} | ||
cw.numConcurrentRequests = n | ||
} | ||
} | ||
|
||
// New returns a CloudWatch object that may be used to create metrics. | ||
// Namespace is applied to all created metrics and maps to the CloudWatch namespace. | ||
// Callers must ensure that regular calls to Send are performed, either | ||
// manually or with one of the helper methods. | ||
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...Option) *CloudWatch { | ||
cw := &CloudWatch{ | ||
namespace: namespace, | ||
svc: svc, | ||
counters: lv.NewSpace(), | ||
numConcurrentRequests: 10, | ||
logger: log.NewNopLogger(), | ||
} | ||
|
||
for _, optFunc := range options { | ||
optFunc(cw) | ||
} | ||
|
||
cw.sem = make(chan struct{}, cw.numConcurrentRequests) | ||
|
||
return cw | ||
} | ||
|
||
// NewCounter returns a counter. Observations are aggregated and emitted once | ||
// per write invocation. | ||
func (cw *CloudWatch) NewCounter(name string) metrics.Counter { | ||
return &Counter{ | ||
name: name, | ||
obs: cw.counters.Observe, | ||
} | ||
} | ||
|
||
// NewGauge returns an gauge. Under the covers, there is no distinctions | ||
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this | ||
// just wraps a cloudwatch2.Counter. | ||
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { | ||
return convert.NewCounterAsGauge(cw.NewCounter(name)) | ||
} | ||
|
||
// NewHistogram returns a histogram. Under the covers, there is no distinctions | ||
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this | ||
// just wraps a cloudwatch2.Counter. | ||
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram { | ||
return convert.NewCounterAsHistogram(cw.NewCounter(name)) | ||
} | ||
|
||
// WriteLoop is a helper method that invokes Send every time the passed | ||
// channel fires. This method blocks until the channel is closed, so clients | ||
// probably want to run it in its own goroutine. For typical usage, create a | ||
// time.Ticker and pass its C channel to this method. | ||
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { | ||
for range c { | ||
if err := cw.Send(); err != nil { | ||
cw.logger.Log("during", "Send", "err", err) | ||
} | ||
} | ||
} | ||
|
||
// Send will fire an API request to CloudWatch with the latest stats for | ||
// all metrics. It is preferred that the WriteLoop method is used. | ||
func (cw *CloudWatch) Send() error { | ||
cw.mtx.RLock() | ||
defer cw.mtx.RUnlock() | ||
now := time.Now() | ||
|
||
var datums []cloudwatch.MetricDatum | ||
|
||
cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { | ||
datums = append(datums, cloudwatch.MetricDatum{ | ||
MetricName: aws.String(name), | ||
Dimensions: makeDimensions(lvs...), | ||
StatisticValues: stats(values), | ||
Timestamp: aws.Time(now), | ||
}) | ||
return true | ||
}) | ||
|
||
var batches [][]cloudwatch.MetricDatum | ||
for len(datums) > 0 { | ||
var batch []cloudwatch.MetricDatum | ||
lim := len(datums) | ||
if lim > maxConcurrentRequests { | ||
lim = maxConcurrentRequests | ||
} | ||
batch, datums = datums[:lim], datums[lim:] | ||
batches = append(batches, batch) | ||
} | ||
|
||
var g errgroup.Group | ||
for _, batch := range batches { | ||
batch := batch | ||
g.Go(func() error { | ||
cw.sem <- struct{}{} | ||
defer func() { | ||
<-cw.sem | ||
}() | ||
req := cw.svc.PutMetricDataRequest(&cloudwatch.PutMetricDataInput{ | ||
Namespace: aws.String(cw.namespace), | ||
MetricData: batch, | ||
}) | ||
_, err := req.Send() | ||
return err | ||
}) | ||
} | ||
return g.Wait() | ||
} | ||
|
||
var zero = float64(0.0) | ||
|
||
// Just build this once to reduce construction costs whenever | ||
// someone does a Send with no aggregated values. | ||
var zeros = cloudwatch.StatisticSet{ | ||
Maximum: &zero, | ||
Minimum: &zero, | ||
Sum: &zero, | ||
SampleCount: &zero, | ||
} | ||
|
||
func stats(a []float64) *cloudwatch.StatisticSet { | ||
count := float64(len(a)) | ||
if count == 0 { | ||
return &zeros | ||
} | ||
|
||
var sum float64 | ||
var min = math.MaxFloat64 | ||
var max = math.MaxFloat64 * -1 | ||
for _, f := range a { | ||
sum += f | ||
if f < min { | ||
min = f | ||
} | ||
if f > max { | ||
max = f | ||
} | ||
} | ||
|
||
return &cloudwatch.StatisticSet{ | ||
Maximum: &max, | ||
Minimum: &min, | ||
Sum: &sum, | ||
SampleCount: &count, | ||
} | ||
} | ||
|
||
func makeDimensions(labelValues ...string) []cloudwatch.Dimension { | ||
dimensions := make([]cloudwatch.Dimension, len(labelValues)/2) | ||
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ha! Wild. Nice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't take credit for this... I lifted this directly from the existing |
||
dimensions[j] = cloudwatch.Dimension{ | ||
Name: aws.String(labelValues[i]), | ||
Value: aws.String(labelValues[i+1]), | ||
} | ||
} | ||
return dimensions | ||
} | ||
|
||
type observeFunc func(name string, lvs lv.LabelValues, value float64) | ||
|
||
// Counter is a counter. Observations are forwarded to a node | ||
// object, and aggregated per timeseries. | ||
type Counter struct { | ||
name string | ||
lvs lv.LabelValues | ||
obs observeFunc | ||
} | ||
|
||
// With implements metrics.Counter. | ||
func (c *Counter) With(labelValues ...string) metrics.Counter { | ||
return &Counter{ | ||
name: c.name, | ||
lvs: c.lvs.With(labelValues...), | ||
obs: c.obs, | ||
} | ||
} | ||
|
||
// Add implements metrics.Counter. | ||
func (c *Counter) Add(delta float64) { | ||
c.obs(c.name, c.lvs, delta) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
package cloudwatch2 | ||
|
||
import ( | ||
"strings" | ||
"testing" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" | ||
) | ||
|
||
func TestStats(t *testing.T) { | ||
testCases := []struct { | ||
name string | ||
vals []float64 | ||
xMin float64 | ||
xMax float64 | ||
xSum float64 | ||
xCt float64 | ||
}{ | ||
{ | ||
"empty", | ||
[]float64{}, | ||
0.0, | ||
0.0, | ||
0.0, | ||
0.0, | ||
}, | ||
{ | ||
"single", | ||
[]float64{3.1416}, | ||
3.1416, | ||
3.1416, | ||
3.1416, | ||
1.0, | ||
}, | ||
{ | ||
"double", | ||
[]float64{1.0, 9.0}, | ||
1.0, | ||
9.0, | ||
10.0, | ||
2.0, | ||
}, | ||
{ | ||
"multiple", | ||
[]float64{5.0, 1.0, 9.0, 5.0}, | ||
1.0, | ||
9.0, | ||
20.0, | ||
4.0, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
s := stats(tc.vals) | ||
if tc.xMin != *s.Minimum { | ||
t.Errorf("expected [%f]: %f\n", tc.xMin, *s.Minimum) | ||
} | ||
if tc.xMax != *s.Maximum { | ||
t.Errorf("expected [%f]: %f\n", tc.xMax, *s.Maximum) | ||
} | ||
if tc.xSum != *s.Sum { | ||
t.Errorf("expected [%f]: %f\n", tc.xSum, *s.Sum) | ||
} | ||
if tc.xCt != *s.SampleCount { | ||
t.Errorf("expected [%f]: %f\n", tc.xCt, *s.SampleCount) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
type mockCloudWatch struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😊 |
||
cloudwatchiface.CloudWatchAPI | ||
latestName string | ||
latestData []cloudwatch.MetricDatum | ||
} | ||
|
||
func (mcw *mockCloudWatch) PutMetricDataRequest(in *cloudwatch.PutMetricDataInput) cloudwatch.PutMetricDataRequest { | ||
mcw.latestName = *in.Namespace | ||
mcw.latestData = in.MetricData | ||
return cloudwatch.PutMetricDataRequest{ | ||
// To mock the V2 API, most of the functions spit | ||
// out structs that you need to call Send() on. | ||
// The non-intuitive thing is that to get the Send() to avoid actually | ||
// going across the wire, you just create a dumb aws.Request with either | ||
// aws.Request.Data defined (for succes) or with aws.Request.Error | ||
// to simulate an Error. | ||
Request: &aws.Request{Data: &cloudwatch.PutMetricDataOutput{}}, | ||
Input: in, | ||
} | ||
} | ||
|
||
func TestSend(t *testing.T) { | ||
ns := "example-namespace" | ||
svc := &mockCloudWatch{} | ||
cw := New(ns, svc) | ||
|
||
c := cw.NewCounter("c").With("charlie", "cat") | ||
h := cw.NewHistogram("h").With("hotel", "horse") | ||
g := cw.NewGauge("g").With("golf", "giraffe") | ||
|
||
c.Add(4.0) | ||
c.Add(5.0) | ||
c.Add(6.0) | ||
h.Observe(3.0) | ||
h.Observe(5.0) | ||
h.Observe(7.0) | ||
g.Set(2.0) | ||
g.Set(5.0) | ||
g.Set(8.0) | ||
|
||
err := cw.Send() | ||
if err != nil { | ||
t.Fatalf("unexpected: %v\n", err) | ||
} | ||
|
||
if ns != svc.latestName { | ||
t.Errorf("expected namespace %q; not %q\n", ns, svc.latestName) | ||
} | ||
|
||
if len(svc.latestData) != 3 { | ||
t.Errorf("expected 3 datums: %v\n", svc.latestData) | ||
} | ||
for _, datum := range svc.latestData { | ||
initial := *datum.MetricName | ||
if len(datum.Dimensions) != 1 { | ||
t.Errorf("expected 1 dimension: %v\n", datum) | ||
} | ||
if !strings.HasPrefix(*datum.Dimensions[0].Name, initial) { | ||
t.Errorf("expected %q in Name of %v\n", initial, datum.Dimensions) | ||
} | ||
if !strings.HasPrefix(*datum.Dimensions[0].Value, initial) { | ||
t.Errorf("expected %q in Value of %v\n", initial, datum.Dimensions) | ||
} | ||
if datum.StatisticValues == nil { | ||
t.Errorf("expected StatisticValues in %v\n", datum) | ||
} | ||
if *datum.StatisticValues.Sum != 15.0 { | ||
t.Errorf("expected 15.0 for Sum in %v\n", datum) | ||
} | ||
if *datum.StatisticValues.SampleCount != 3.0 { | ||
t.Errorf("expected 3.0 for SampleCount in %v\n", datum) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package cloudwatchiface 🤦♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah. It's supposed to be in support of mocking out their service... But it feels so icky.