Skip to content

Commit

Permalink
Make Send/Write Loop context aware (#837)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Ordiales Coscia authored and peterbourgon committed Feb 19, 2019
1 parent 10f464f commit e9fad41
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 53 deletions.
3 changes: 2 additions & 1 deletion metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ A gauge for the number of goroutines currently running, exported via StatsD.

```go
import (
"context"
"net"
"os"
"runtime"
Expand All @@ -81,7 +82,7 @@ func main() {
statsd := statsd.New("foo_svc.", log.NewNopLogger())
report := time.NewTicker(5 * time.Second)
defer report.Stop()
go statsd.SendLoop(report.C, "tcp", "statsd.internal:8125")
go statsd.SendLoop(context.Background(), report.C, "tcp", "statsd.internal:8125")
goroutines := statsd.NewGauge("goroutine_count")
go exportGoroutines(goroutines)
// ...
Expand Down
18 changes: 12 additions & 6 deletions metrics/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cloudwatch

import (
"context"
"fmt"
"os"
"strconv"
"sync"
"time"

Expand All @@ -14,7 +16,6 @@ import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
"strconv"
)

const (
Expand Down Expand Up @@ -136,13 +137,18 @@ func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// channel fires. This method blocks until ctx is canceled, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
for {
select {
case <-c:
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
case <-ctx.Done():
return
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions metrics/cloudwatch2/cloudwatch2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package cloudwatch2

import (
"context"
"math"
"sync"
"time"
Expand Down Expand Up @@ -107,13 +108,18 @@ func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// channel fires. This method blocks until ctx is canceled, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
for {
select {
case <-c:
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
case <-ctx.Done():
return
}
}
}
Expand Down
24 changes: 15 additions & 9 deletions metrics/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package dogstatsd

import (
"context"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -109,24 +110,29 @@ func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (d *Dogstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
func (d *Dogstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (d *Dogstatsd) SendLoop(c <-chan time.Time, network, address string) {
d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
func (d *Dogstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down
24 changes: 15 additions & 9 deletions metrics/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package graphite

import (
"context"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -83,24 +84,29 @@ func (g *Graphite) NewHistogram(name string, buckets int) *Histogram {
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (g *Graphite) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := g.WriteTo(w); err != nil {
g.logger.Log("during", "WriteTo", "err", err)
func (g *Graphite) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := g.WriteTo(w); err != nil {
g.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (g *Graphite) SendLoop(c <-chan time.Time, network, address string) {
g.WriteLoop(c, conn.NewDefaultManager(network, address, g.logger))
func (g *Graphite) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
g.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, g.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down
14 changes: 10 additions & 4 deletions metrics/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package influx

import (
"context"
"time"

influxdb "github.com/influxdata/influxdb1-client/v2"
Expand Down Expand Up @@ -88,10 +89,15 @@ type BatchPointsWriter interface {
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) {
for range c {
if err := in.WriteTo(w); err != nil {
in.logger.Log("during", "WriteTo", "err", err)
func (in *Influx) WriteLoop(ctx context.Context, c <-chan time.Time, w BatchPointsWriter) {
for {
select {
case <-c:
if err := in.WriteTo(w); err != nil {
in.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}
Expand Down
24 changes: 15 additions & 9 deletions metrics/influxstatsd/influxstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package influxstatsd

import (
"context"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -109,24 +110,29 @@ func (d *Influxstatsd) NewHistogram(name string, sampleRate float64) *Histogram
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (d *Influxstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
func (d *Influxstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := d.WriteTo(w); err != nil {
d.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (d *Influxstatsd) SendLoop(c <-chan time.Time, network, address string) {
d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
func (d *Influxstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down
2 changes: 1 addition & 1 deletion metrics/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// case "statsd":
// s := statsd.New(...)
// t := time.NewTicker(5*time.Second)
// go s.SendLoop(t.C, "tcp", "statsd.local:8125")
// go s.SendLoop(ctx, t.C, "tcp", "statsd.local:8125")
// latency = s.NewHistogram(...)
// requests = s.NewCounter(...)
// default:
Expand Down
24 changes: 15 additions & 9 deletions metrics/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package statsd

import (
"context"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -89,24 +90,29 @@ func (s *Statsd) NewTiming(name string, sampleRate float64) *Timing {
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// time the passed channel fires. This method blocks until ctx is canceled,
// so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (s *Statsd) WriteLoop(c <-chan time.Time, w io.Writer) {
for range c {
if _, err := s.WriteTo(w); err != nil {
s.logger.Log("during", "WriteTo", "err", err)
func (s *Statsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
for {
select {
case <-c:
if _, err := s.WriteTo(w); err != nil {
s.logger.Log("during", "WriteTo", "err", err)
}
case <-ctx.Done():
return
}
}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// until ctx is canceled, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (s *Statsd) SendLoop(c <-chan time.Time, network, address string) {
s.WriteLoop(c, conn.NewDefaultManager(network, address, s.logger))
func (s *Statsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
s.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, s.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
Expand Down

0 comments on commit e9fad41

Please sign in to comment.