diff --git a/.gitignore b/.gitignore index a127b89f7d703..a471ffe03ff2c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ pkg/ tivan .vagrant +telegraf diff --git a/CHANGELOG.md b/CHANGELOG.md index dc226525905bb..e94713337a731 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#98](https://github.com/influxdb/telegraf/pull/98): LeoFS plugin. Thanks @mocchira! - [#103](https://github.com/influxdb/telegraf/pull/103): Filter by metric tags. Thanks @srfraser! - [#106](https://github.com/influxdb/telegraf/pull/106): Options to filter plugins on startup. Thanks @zepouet! +- [#107](https://github.com/influxdb/telegraf/pull/107): Multiple outputs beyong influxdb. Thanks @jipperinbham! ### Bugfixes - [#85](https://github.com/influxdb/telegraf/pull/85): Fix GetLocalHost testutil function for mac users diff --git a/agent.go b/agent.go index ef687bd308777..1c90bf1a4fde4 100644 --- a/agent.go +++ b/agent.go @@ -3,17 +3,21 @@ package telegraf import ( "fmt" "log" - "net/url" "os" "sort" "strings" "sync" "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" ) +type runningOutput struct { + name string + output outputs.Output +} + type runningPlugin struct { name string plugin plugins.Plugin @@ -32,9 +36,8 @@ type Agent struct { Config *Config + outputs []*runningOutput plugins []*runningPlugin - - conn *client.Client } // NewAgent returns an Agent struct based off the given Config @@ -64,38 +67,50 @@ func NewAgent(config *Config) (*Agent, error) { return agent, nil } -// Connect connects to the agent's config URL +// Connect connects to all configured outputs func (a *Agent) Connect() error { - config := a.Config + for _, o := range a.outputs { + err := o.output.Connect() + if err != nil { + return err + } + } + return nil +} - u, err := url.Parse(config.URL) - if err != nil { - return err +// Close closes the connection to all configured outputs +func (a *Agent) Close() error { + var err error + for _, o := range a.outputs { + err = o.output.Close() } + return err +} - c, err := client.NewClient(client.Config{ - URL: *u, - Username: config.Username, - Password: config.Password, - UserAgent: config.UserAgent, - Timeout: config.Timeout.Duration, - }) +// LoadOutputs loads the agent's outputs +func (a *Agent) LoadOutputs() ([]string, error) { + var names []string - if err != nil { - return err - } + for _, name := range a.Config.OutputsDeclared() { + creator, ok := outputs.Outputs[name] + if !ok { + return nil, fmt.Errorf("Undefined but requested output: %s", name) + } - _, err = c.Query(client.Query{ - Command: fmt.Sprintf("CREATE DATABASE %s", config.Database), - }) + output := creator() - if err != nil && !strings.Contains(err.Error(), "database already exists") { - log.Fatal(err) + err := a.Config.ApplyOutput(name, output) + if err != nil { + return nil, err + } + + a.outputs = append(a.outputs, &runningOutput{name, output}) + names = append(names, name) } - a.conn = c + sort.Strings(names) - return nil + return names, nil } // LoadPlugins loads the agent's plugins @@ -174,61 +189,59 @@ func (a *Agent) crankParallel() error { close(points) - var acc BatchPoints - acc.Tags = a.Config.Tags - acc.Time = time.Now() - acc.Database = a.Config.Database + var bp BatchPoints + bp.Time = time.Now() + bp.Tags = a.Config.Tags for sub := range points { - acc.Points = append(acc.Points, sub.Points...) + bp.Points = append(bp.Points, sub.Points...) } - _, err := a.conn.Write(acc.BatchPoints) - return err + return a.flush(&bp) } func (a *Agent) crank() error { - var acc BatchPoints + var bp BatchPoints - acc.Debug = a.Debug + bp.Debug = a.Debug for _, plugin := range a.plugins { - acc.Prefix = plugin.name + "_" - acc.Config = plugin.config - err := plugin.plugin.Gather(&acc) + bp.Prefix = plugin.name + "_" + bp.Config = plugin.config + err := plugin.plugin.Gather(&bp) if err != nil { return err } } - acc.Tags = a.Config.Tags - acc.Time = time.Now() - acc.Database = a.Config.Database + bp.Time = time.Now() + bp.Tags = a.Config.Tags - _, err := a.conn.Write(acc.BatchPoints) - return err + return a.flush(&bp) } func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { ticker := time.NewTicker(plugin.config.Interval) for { - var acc BatchPoints + var bp BatchPoints - acc.Debug = a.Debug + bp.Debug = a.Debug - acc.Prefix = plugin.name + "_" - acc.Config = plugin.config - err := plugin.plugin.Gather(&acc) + bp.Prefix = plugin.name + "_" + bp.Config = plugin.config + err := plugin.plugin.Gather(&bp) if err != nil { return err } - acc.Tags = a.Config.Tags - acc.Time = time.Now() - acc.Database = a.Config.Database + bp.Tags = a.Config.Tags + bp.Time = time.Now() - a.conn.Write(acc.BatchPoints) + err = a.flush(&bp) + if err != nil { + return err + } select { case <-shutdown: @@ -239,6 +252,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err } } +func (a *Agent) flush(bp *BatchPoints) error { + var wg sync.WaitGroup + var outerr error + for _, o := range a.outputs { + wg.Add(1) + go func(ro *runningOutput) { + defer wg.Done() + outerr = ro.output.Write(bp.BatchPoints) + }(o) + } + + wg.Wait() + + return outerr +} + // TestAllPlugins verifies that we can 'Gather' from all plugins with the // default configuration func (a *Agent) TestAllPlugins() error { @@ -297,13 +326,6 @@ func (a *Agent) Test() error { // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { - if a.conn == nil { - err := a.Connect() - if err != nil { - return err - } - } - var wg sync.WaitGroup for _, plugin := range a.plugins { diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 1b6ad088a9bca..d6fdd965bd25c 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -59,6 +59,11 @@ func main() { ag.Debug = true } + outputs, err := ag.LoadOutputs() + if err != nil { + log.Fatal(err) + } + plugins, err := ag.LoadPlugins(*fPLuginsFilter) if err != nil { log.Fatal(err) @@ -99,6 +104,7 @@ func main() { }() log.Printf("Starting Telegraf (version %s)\n", Version) + log.Printf("Loaded outputs: %s", strings.Join(outputs, " ")) log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) if ag.Debug { log.Printf("Debug: enabled") @@ -106,11 +112,6 @@ func main() { ag.Interval, ag.Debug, ag.Hostname) } - if config.URL != "" { - log.Printf("Sending metrics to: %s", config.URL) - log.Printf("Tags enabled: %v", config.ListTags()) - } - if *fPidfile != "" { f, err := os.Create(*fPidfile) if err != nil { diff --git a/config.go b/config.go index 48df238ac858b..db49dfa2f0455 100644 --- a/config.go +++ b/config.go @@ -34,16 +34,11 @@ func (d *Duration) UnmarshalTOML(b []byte) error { // will be logging to, as well as all the plugins that the user has // specified type Config struct { - URL string - Username string - Password string - Database string - UserAgent string - Timeout Duration - Tags map[string]string + Tags map[string]string agent *ast.Table plugins map[string]*ast.Table + outputs map[string]*ast.Table } // Plugins returns the configured plugins as a map of name -> plugin toml @@ -51,6 +46,11 @@ func (c *Config) Plugins() map[string]*ast.Table { return c.plugins } +// Outputs returns the configured outputs as a map of name -> output toml +func (c *Config) Outputs() map[string]*ast.Table { + return c.outputs +} + // The name of a tag, and the values on which to filter type TagFilter struct { Name string @@ -122,6 +122,14 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin return true } +// ApplyOutput loads the toml config into the given interface +func (c *Config) ApplyOutput(name string, v interface{}) error { + if c.outputs[name] != nil { + return toml.UnmarshalTable(c.outputs[name], v) + } + return nil +} + // ApplyAgent loads the toml config into the given interface func (c *Config) ApplyAgent(v interface{}) error { if c.agent != nil { @@ -225,15 +233,24 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err // PluginsDeclared returns the name of all plugins declared in the config. func (c *Config) PluginsDeclared() []string { - var plugins []string + return declared(c.plugins) +} + +// OutputsDeclared returns the name of all outputs declared in the config. +func (c *Config) OutputsDeclared() []string { + return declared(c.outputs) +} - for name := range c.plugins { - plugins = append(plugins, name) +func declared(endpoints map[string]*ast.Table) []string { + var names []string + + for name := range endpoints { + names = append(names, name) } - sort.Strings(plugins) + sort.Strings(names) - return plugins + return names } // DefaultConfig returns an empty default configuration @@ -257,6 +274,7 @@ func LoadConfig(path string) (*Config, error) { c := &Config{ plugins: make(map[string]*ast.Table), + outputs: make(map[string]*ast.Table), } for name, val := range tbl.Fields { @@ -266,13 +284,16 @@ func LoadConfig(path string) (*Config, error) { } switch name { - case "influxdb": - err := toml.UnmarshalTable(subtbl, c) - if err != nil { - return nil, err - } case "agent": c.agent = subtbl + case "outputs": + for outputName, outputVal := range subtbl.Fields { + outputSubtbl, ok := outputVal.(*ast.Table) + if !ok { + return nil, errInvalidConfig + } + c.outputs[outputName] = outputSubtbl + } default: c.plugins[name] = subtbl } @@ -327,8 +348,11 @@ var header = `# Telegraf configuration # NOTE: The configuration has a few required parameters. They are marked # with 'required'. Be sure to edit those to make this configuration work. +# OUTPUTS +[outputs] + # Configuration for influxdb server to send metrics to -[influxdb] +[outputs.influxdb] # The full HTTP endpoint URL for your InfluxDB instance url = "http://localhost:8086" # required. @@ -345,12 +369,11 @@ database = "telegraf" # required. # Set the user agent for the POSTs (can be useful for log differentiation) # user_agent = "telegraf" -# tags = { "dc": "us-east-1" } # Tags can also be specified via a normal map, but only one form at a time: -# [influxdb.tags] -# dc = "us-east-1" +# [tags] +# dc = "us-east-1" } # Configuration for telegraf itself # [agent] diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 38cfeba684411..9faf5b871a9e9 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -35,12 +35,11 @@ database = "telegraf" # required. # Set the user agent for the POSTs (can be useful for log differentiation) # user_agent = "telegraf" -# tags = { "dc": "us-east-1" } # Tags can also be specified via a normal map, but only one form at a time: # [influxdb.tags] -# dc = "us-east-1" +# tags = { "dc" = "us-east-1" } # Configuration for telegraf itself # [agent] diff --git a/outputs/all/all.go b/outputs/all/all.go new file mode 100644 index 0000000000000..2a8018674349c --- /dev/null +++ b/outputs/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdb/telegraf/outputs/influxdb" +) diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go new file mode 100644 index 0000000000000..96505a4d7a007 --- /dev/null +++ b/outputs/influxdb/influxdb.go @@ -0,0 +1,72 @@ +package influxdb + +import ( + "fmt" + "log" + "net/url" + "strings" + + "github.com/influxdb/influxdb/client" + t "github.com/influxdb/telegraf" + "github.com/influxdb/telegraf/outputs" +) + +type InfluxDB struct { + URL string + Username string + Password string + Database string + UserAgent string + Timeout t.Duration + + conn *client.Client +} + +func (i *InfluxDB) Connect() error { + u, err := url.Parse(i.URL) + if err != nil { + return err + } + + c, err := client.NewClient(client.Config{ + URL: *u, + Username: i.Username, + Password: i.Password, + UserAgent: i.UserAgent, + Timeout: i.Timeout.Duration, + }) + + if err != nil { + return err + } + + _, err = c.Query(client.Query{ + Command: fmt.Sprintf("CREATE DATABASE telegraf"), + }) + + if err != nil && !strings.Contains(err.Error(), "database already exists") { + log.Fatal(err) + } + + i.conn = c + return nil +} + +func (i *InfluxDB) Close() error { + // InfluxDB client does not provide a Close() function + return nil +} + +func (i *InfluxDB) Write(bp client.BatchPoints) error { + bp.Database = i.Database + if _, err := i.conn.Write(bp); err != nil { + return err + } + return nil +} + +func init() { + outputs.Add("influxdb", func() outputs.Output { + return &InfluxDB{} + }) +} diff --git a/outputs/registry.go b/outputs/registry.go new file mode 100644 index 0000000000000..a2f22f73b1f1a --- /dev/null +++ b/outputs/registry.go @@ -0,0 +1,19 @@ +package outputs + +import ( + "github.com/influxdb/influxdb/client" +) + +type Output interface { + Connect() error + Close() error + Write(client.BatchPoints) error +} + +type Creator func() Output + +var Outputs = map[string]Creator{} + +func Add(name string, creator Creator) { + Outputs[name] = creator +} diff --git a/testdata/influx.toml b/testdata/influx.toml index fc5d3c493051b..492528cae92fc 100644 --- a/testdata/influx.toml +++ b/testdata/influx.toml @@ -3,12 +3,15 @@ interval = "5s" http = ":11213" debug = true -[influxdb] +[outputs] +[outputs.influxdb] url = "http://localhost:8086" username = "root" password = "root" database = "telegraf" -tags = { dc = "us-phx-1" } + +[tags] +dc = "us-phx-1" } [redis] address = ":6379" diff --git a/testdata/telegraf-agent.toml b/testdata/telegraf-agent.toml index b5899559423f7..fd0221a4788fc 100644 --- a/testdata/telegraf-agent.toml +++ b/testdata/telegraf-agent.toml @@ -23,7 +23,8 @@ # with 'required'. Be sure to edit those to make this configuration work. # Configuration for influxdb server to send metrics to -[influxdb] +[outputs] +[outputs.influxdb] # The full HTTP endpoint URL for your InfluxDB instance url = "http://localhost:8086" # required. @@ -40,11 +41,10 @@ database = "telegraf" # required. # Set the user agent for the POSTs (can be useful for log differentiation) # user_agent = "telegraf" -# tags = { "dc": "us-east-1" } # Tags can also be specified via a normal map, but only one form at a time: -# [influxdb.tags] +# [tags] # dc = "us-east-1" # Configuration for telegraf itself @@ -204,11 +204,11 @@ urls = ["localhost/status"] # postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full] # or a simple string: # host=localhost user=pqotest password=... sslmode=... -# +# # All connection parameters are optional. By default, the host is localhost # and the user is the currently running user. For localhost, we default # to sslmode=disable as well. -# +# address = "sslmode=disable"