Skip to content

Commit

Permalink
Utilizing new client and overhauling Accumulator interface
Browse files Browse the repository at this point in the history
Fixes #280
Fixes #281
Fixes #289
  • Loading branch information
sparrc committed Oct 20, 2015
1 parent 6263bc2 commit 200cd2e
Show file tree
Hide file tree
Showing 25 changed files with 466 additions and 529 deletions.
21 changes: 12 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ type Plugin interface {
}

type Accumulator interface {
Add(measurement string, value interface{}, tags map[string]string)
AddFieldsWithTime(measurement string,
values map[string]interface{},
Add(measurement string,
value interface{},
tags map[string]string,
timestamp time.Time)
timestamp ...time.Time)
AddFields(measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time)
}
```

Expand Down Expand Up @@ -81,8 +84,8 @@ func Gather(acc plugins.Accumulator) error {
"pid": fmt.Sprintf("%d", process.Pid),
}

acc.Add("cpu", process.CPUTime, tags)
acc.Add("memory", process.MemoryBytes, tags)
acc.Add("cpu", process.CPUTime, tags, time.Now())
acc.Add("memory", process.MemoryBytes, tags, time.Now())
}
}
```
Expand Down Expand Up @@ -179,7 +182,7 @@ type Output interface {
Close() error
Description() string
SampleConfig() string
Write(client.BatchPoints) error
Write(points []*client.Point) error
}
```

Expand Down Expand Up @@ -214,8 +217,8 @@ func (s *Simple) Close() error {
return nil
}

func (s *Simple) Write(bp client.BatchPoints) error {
for _, pt := range bp {
func (s *Simple) Write(points []*client.Point) error {
for _, pt := range points {
// write `pt` to the output sink here
}
return nil
Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ ifeq ($(UNAME), Linux)
ADVERTISED_HOST=localhost docker-compose --file scripts/docker-compose.yml up -d
endif

test: prepare docker-compose
$(GOBIN)/godep go test ./...
test: test-cleanup prepare docker-compose
# Sleeping for kafka leadership election, TSDB setup, etc.
sleep 30
# Setup SUCCESS, running tests
godep go test ./...

test-short: prepare
$(GOBIN)/godep go test -short ./...
Expand Down
210 changes: 80 additions & 130 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,188 +2,138 @@ package telegraf

import (
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/client/v2"
)

// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
sync.Mutex

client.BatchPoints
type Accumulator interface {
Add(measurement string, value interface{},
tags map[string]string, t ...time.Time)
AddFields(measurement string, fields map[string]interface{},
tags map[string]string, t ...time.Time)

Debug bool
SetDefaultTags(tags map[string]string)
AddDefaultTag(key, value string)

Prefix string
Prefix() string
SetPrefix(prefix string)

Config *ConfiguredPlugin
Debug() bool
SetDebug(enabled bool)
}

// deepcopy returns a deep copy of the BatchPoints object. This is primarily so
// we can do multithreaded output flushing (see Agent.flush)
func (bp *BatchPoints) deepcopy() *BatchPoints {
bp.Lock()
defer bp.Unlock()

var bpc BatchPoints
bpc.Time = bp.Time
bpc.Precision = bp.Precision

bpc.Tags = make(map[string]string)
for k, v := range bp.Tags {
bpc.Tags[k] = v
}
func NewAccumulator(
plugin *ConfiguredPlugin,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
acc.points = points
acc.plugin = plugin
return &acc
}

var pts []client.Point
for _, pt := range bp.Points {
var ptc client.Point
type accumulator struct {
sync.Mutex

ptc.Measurement = pt.Measurement
ptc.Time = pt.Time
ptc.Precision = pt.Precision
ptc.Raw = pt.Raw
points chan *client.Point

ptc.Tags = make(map[string]string)
ptc.Fields = make(map[string]interface{})
defaultTags map[string]string

for k, v := range pt.Tags {
ptc.Tags[k] = v
}
debug bool

for k, v := range pt.Fields {
ptc.Fields[k] = v
}
pts = append(pts, ptc)
}
plugin *ConfiguredPlugin

bpc.Points = pts
return &bpc
prefix string
}

// Add adds a measurement
func (bp *BatchPoints) Add(
func (ac *accumulator) Add(
measurement string,
val interface{},
value interface{},
tags map[string]string,
t ...time.Time,
) {
fields := make(map[string]interface{})
fields["value"] = val
bp.AddFields(measurement, fields, tags)
fields["value"] = value
ac.AddFields(measurement, fields, tags, t...)
}

// AddFieldsWithTime adds a measurement with a provided timestamp
func (bp *BatchPoints) AddFieldsWithTime(
func (ac *accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp time.Time,
t ...time.Time,
) {
// TODO this function should add the fields with the timestamp, but that will
// need to wait for the InfluxDB point precision/unit to be fixed
bp.AddFields(measurement, fields, tags)
// bp.Lock()
// defer bp.Unlock()

// measurement = bp.Prefix + measurement

// if bp.Config != nil {
// if !bp.Config.ShouldPass(measurement, tags) {
// return
// }
// }

// if bp.Debug {
// var tg []string

// for k, v := range tags {
// tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
// }

// var vals []string

// for k, v := range fields {
// vals = append(vals, fmt.Sprintf("%s=%v", k, v))
// }

// sort.Strings(tg)
// sort.Strings(vals)

// fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
// }

// bp.Points = append(bp.Points, client.Point{
// Measurement: measurement,
// Tags: tags,
// Fields: fields,
// Time: timestamp,
// })
}

// AddFields will eventually replace the Add function, once we move to having a
// single plugin as a single measurement with multiple fields
func (bp *BatchPoints) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
) {
bp.Lock()
defer bp.Unlock()
if tags == nil {
tags = make(map[string]string)
}

// InfluxDB does not support writing uint64
// InfluxDB client/points does not support writing uint64
// TODO fix when it does
// https://github.com/influxdb/influxdb/pull/4508
for k, v := range fields {
switch val := v.(type) {
case uint64:
if val < uint64(9223372036854775808) {
fields[k] = int64(val)
} else {
fields[k] = int64(9223372036854775807)
}
}
}

measurement = bp.Prefix + measurement
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}

if bp.Config != nil {
if !bp.Config.ShouldPass(measurement, tags) {
if ac.plugin != nil {
if !ac.plugin.ShouldPass(measurement, tags) {
return
}
}

// Apply BatchPoints tags to tags passed in, giving precedence to those
// passed in. This is so that plugins have the ability to override global
// tags.
for k, v := range bp.Tags {
_, ok := tags[k]
if !ok {
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

if bp.Debug {
var tg []string
if ac.prefix != "" {
measurement = ac.prefix + measurement
}

for k, v := range tags {
tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
}
pt := client.NewPoint(measurement, tags, fields, timestamp)
if ac.debug {
fmt.Println("> " + pt.String())
}
ac.points <- pt
}

var vals []string
func (ac *accumulator) SetDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}

for k, v := range fields {
vals = append(vals, fmt.Sprintf("%s=%v", k, v))
}
func (ac *accumulator) AddDefaultTag(key, value string) {
ac.defaultTags[key] = value
}

sort.Strings(tg)
sort.Strings(vals)
func (ac *accumulator) Prefix() string {
return ac.prefix
}

fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
}
func (ac *accumulator) SetPrefix(prefix string) {
ac.prefix = prefix
}

func (ac *accumulator) Debug() bool {
return ac.debug
}

bp.Points = append(bp.Points, client.Point{
Measurement: measurement,
Tags: tags,
Fields: fields,
})
func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}
Loading

0 comments on commit 200cd2e

Please sign in to comment.