From 9677a0faabe7491dc6e11d3fd55067720a5efa3b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 8 Sep 2015 19:06:19 -0700 Subject: [PATCH] Add collectd stats --- CHANGELOG.md | 1 + services/collectd/service.go | 39 +++++++++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2991bdb5d54..879368b3e24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ With this release InfluxDB is moving to Go 1.5. ### Features +- [#4050](https://github.com/influxdb/influxdb/pull/4050): Add stats to collectd - [#3771](https://github.com/influxdb/influxdb/pull/3771): Close idle Graphite TCP connections - [#3755](https://github.com/influxdb/influxdb/issues/3755): Add option to build script. Thanks @fg2it - [#3863](https://github.com/influxdb/influxdb/pull/3863): Move to Go 1.5 diff --git a/services/collectd/service.go b/services/collectd/service.go index b0fee0bc49b..7d8e65f820c 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -1,13 +1,16 @@ package collectd import ( + "expvar" "fmt" "log" "net" "os" + "strings" "sync" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/tsdb" @@ -16,6 +19,17 @@ import ( const leaderWaitTimeout = 30 * time.Second +// statistics gathered by the collectd service. +const ( + statPointsReceived = "points_rx" + statBytesReceived = "bytes_rx" + statPointsParseFail = "points_parse_fail" + statReadFail = "read_fail" + statBatchesTrasmitted = "batches_tx" + statPointsTransmitted = "points_tx" + statBatchesTransmitFail = "batches_tx_fail" +) + // pointsWriter is an internal interface to make testing easier. type pointsWriter interface { WritePoints(p *cluster.WritePointsRequest) error @@ -42,6 +56,9 @@ type Service struct { batcher *tsdb.PointBatcher typesdb gollectd.Types addr net.Addr + + // expvar-based stats. + statMap *expvar.Map } // NewService returns a new instance of the collectd service. @@ -59,6 +76,12 @@ func NewService(c Config) *Service { func (s *Service) Open() error { s.Logger.Printf("Starting collectd service") + // Configure expvar monitoring. It's OK to do this even if the service fails to open and + // should be done before any data could arrive for the service. + key := strings.Join([]string{"collectd", s.Config.BindAddress}, ":") + tags := map[string]string{"bind": s.Config.BindAddress} + s.statMap = influxdb.NewStatistics(key, "collectd", tags) + if s.Config.BindAddress == "" { return fmt.Errorf("bind address is blank") } else if s.Config.Database == "" { @@ -182,10 +205,12 @@ func (s *Service) serve() { n, _, err := s.ln.ReadFromUDP(buffer) if err != nil { + s.statMap.Add(statReadFail, 1) s.Logger.Printf("collectd ReadFromUDP error: %s", err) continue } if n > 0 { + s.statMap.Add(statBytesReceived, int64(n)) s.handleMessage(buffer[:n]) } } @@ -194,6 +219,7 @@ func (s *Service) serve() { func (s *Service) handleMessage(buffer []byte) { packets, err := gollectd.Packets(buffer, s.typesdb) if err != nil { + s.statMap.Add(statPointsParseFail, 1) s.Logger.Printf("Collectd parse error: %s", err) return } @@ -202,6 +228,7 @@ func (s *Service) handleMessage(buffer []byte) { for _, p := range points { s.batcher.In() <- p } + s.statMap.Add(statPointsReceived, int64(len(points))) } } @@ -213,15 +240,17 @@ func (s *Service) writePoints() { case <-s.stop: return case batch := <-s.batcher.Out(): - req := &cluster.WritePointsRequest{ + if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{ Database: s.Config.Database, RetentionPolicy: s.Config.RetentionPolicy, ConsistencyLevel: cluster.ConsistencyLevelAny, Points: batch, - } - if err := s.PointsWriter.WritePoints(req); err != nil { - s.Logger.Printf("failed to write batch: %s", err) - continue + }); err == nil { + s.statMap.Add(statBatchesTrasmitted, 1) + s.statMap.Add(statPointsTransmitted, int64(len(batch))) + } else { + s.Logger.Printf("failed to write point batch to database %q: %s", s.Config.Database, err) + s.statMap.Add(statBatchesTransmitFail, 1) } } }