Skip to content

Commit

Permalink
Merging in PR #107, support for multiple outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Aug 12, 2015
2 parents 9e2f8f6 + 574babd commit a6d305e
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 101 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pkg/
tivan
.vagrant
telegraf
129 changes: 72 additions & 57 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package telegraf
import (
"fmt"
"log"
"net/url"
"os"
"sort"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
)

type runningOutput struct {
name string
output outputs.Output
}

type runningPlugin struct {
name string
plugin plugins.Plugin
Expand All @@ -32,9 +36,8 @@ type Agent struct {

Config *Config

outputs []*runningOutput
plugins []*runningPlugin

conn *client.Client
}

// NewAgent returns an Agent struct based off the given Config
Expand Down Expand Up @@ -66,36 +69,41 @@ func NewAgent(config *Config) (*Agent, error) {

// Connect connects to the agent's config URL
func (a *Agent) Connect() error {
config := a.Config

u, err := url.Parse(config.URL)
if err != nil {
return err
for _, o := range a.outputs {
err := o.output.Connect()
if err != nil {
return err
}
}
return nil
}

c, err := client.NewClient(client.Config{
URL: *u,
Username: config.Username,
Password: config.Password,
UserAgent: config.UserAgent,
Timeout: config.Timeout.Duration,
})
// LoadOutputs loads the agent's outputs
func (a *Agent) LoadOutputs() ([]string, error) {
var names []string

if err != nil {
return err
}
for _, name := range a.Config.OutputsDeclared() {
creator, ok := outputs.Outputs[name]
if !ok {
return nil, fmt.Errorf("Undefined but requested output: %s", name)
}

_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE %s", config.Database),
})
output := creator()

err := a.Config.ApplyOutput(name, output)
if err != nil {
return nil, err
}

if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
a.outputs = append(a.outputs, &runningOutput{name, output})
names = append(names, name)
}

a.conn = c
sort.Strings(names)

return nil
return names, nil
}

// LoadPlugins loads the agent's plugins
Expand Down Expand Up @@ -174,61 +182,59 @@ func (a *Agent) crankParallel() error {

close(points)

var acc BatchPoints
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
var bp BatchPoints
bp.Time = time.Now()
bp.Tags = a.Config.Tags

for sub := range points {
acc.Points = append(acc.Points, sub.Points...)
bp.Points = append(bp.Points, sub.Points...)
}

_, err := a.conn.Write(acc.BatchPoints)
return err
return a.flush(&bp)
}

func (a *Agent) crank() error {
var acc BatchPoints
var bp BatchPoints

acc.Debug = a.Debug
bp.Debug = a.Debug

for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
}
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
bp.Time = time.Now()
bp.Tags = a.Config.Tags

_, err := a.conn.Write(acc.BatchPoints)
return err
return a.flush(&bp)
}

func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval)

for {
var acc BatchPoints
var bp BatchPoints

acc.Debug = a.Debug
bp.Debug = a.Debug

acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
bp.Tags = a.Config.Tags
bp.Time = time.Now()

a.conn.Write(acc.BatchPoints)
err = a.flush(&bp)
if err != nil {
return err
}

select {
case <-shutdown:
Expand All @@ -239,6 +245,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
}
}

func (a *Agent) flush(bp *BatchPoints) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
go func(ro *runningOutput) {
defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints)
}(o)
}

wg.Wait()

return outerr
}

// TestAllPlugins verifies that we can 'Gather' from all plugins with the
// default configuration
func (a *Agent) TestAllPlugins() error {
Expand Down Expand Up @@ -297,13 +319,6 @@ func (a *Agent) Test() error {

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
if a.conn == nil {
err := a.Connect()
if err != nil {
return err
}
}

var wg sync.WaitGroup

for _, plugin := range a.plugins {
Expand Down
29 changes: 19 additions & 10 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/influxdb/telegraf"
_ "github.com/influxdb/telegraf/outputs/all"
_ "github.com/influxdb/telegraf/plugins/all"
)

Expand All @@ -21,16 +22,13 @@ var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var fPLuginsFilter = flag.String("filter", "", "filter the plugins to enable, separator is :")

// Telegraf version
var Version = "unreleased"

// Telegraf commit
var Commit = ""
var Version = "0.1.5-dev"

func main() {
flag.Parse()

if *fVersion {
fmt.Printf("InfluxDB Telegraf agent - Version %s\n", Version)
fmt.Printf("Telegraf - Version %s\n", Version)
return
}

Expand Down Expand Up @@ -62,6 +60,15 @@ func main() {
ag.Debug = true
}

outputs, err := ag.LoadOutputs()
if err != nil {
log.Fatal(err)
}
if len(outputs) == 0 {
log.Printf("Error: no outputs found, did you provide a config file?")
os.Exit(1)
}

plugins, err := ag.LoadPlugins(*fPLuginsFilter)
if err != nil {
log.Fatal(err)
Expand All @@ -70,6 +77,10 @@ func main() {
log.Printf("Error: no plugins found, did you provide a config file?")
os.Exit(1)
}
if len(plugins) == 0 {
log.Printf("Error: no plugins found, did you provide a config file?")
os.Exit(1)
}

if *fTest {
if *fConfig != "" {
Expand Down Expand Up @@ -101,18 +112,16 @@ func main() {
close(shutdown)
}()

log.Print("InfluxDB Agent running")
log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n",
ag.Interval, ag.Debug, ag.Hostname)
}

if config.URL != "" {
log.Printf("Sending metrics to: %s", config.URL)
log.Printf("Tags enabled: %v", config.ListTags())
}
log.Printf("Tags enabled: %v", config.ListTags())

if *fPidfile != "" {
f, err := os.Create(*fPidfile)
Expand Down
Loading

0 comments on commit a6d305e

Please sign in to comment.