Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple listeners per input #6228

Merged
merged 4 commits into from
Apr 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- [#6237](https://github.com/influxdata/influxdb/issues/6237): Enable continuous integration testing on Windows platform via AppVeyor. Thanks @mvadu
- [#6263](https://github.com/influxdata/influxdb/pull/6263): Reduce UDP Service allocation size.
- [#6228](https://github.com/influxdata/influxdb/pull/6228): Support for multiple listeners for collectd and OpenTSDB inputs.

### Bugfixes

Expand Down
43 changes: 14 additions & 29 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ type Config struct {
Retention retention.Config `toml:"retention"`
Precreator precreator.Config `toml:"shard-precreation"`

Admin admin.Config `toml:"admin"`
Monitor monitor.Config `toml:"monitor"`
Subscriber subscriber.Config `toml:"subscriber"`
HTTPD httpd.Config `toml:"http"`
Graphites []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDPs []udp.Config `toml:"udp"`
Admin admin.Config `toml:"admin"`
Monitor monitor.Config `toml:"monitor"`
Subscriber subscriber.Config `toml:"subscriber"`
HTTPD httpd.Config `toml:"http"`
GraphiteInputs []graphite.Config `toml:"graphite"`
CollectdInputs []collectd.Config `toml:"collectd"`
OpenTSDBInputs []opentsdb.Config `toml:"opentsdb"`
UDPInputs []udp.Config `toml:"udp"`

ContinuousQuery continuous_querier.Config `toml:"continuous_queries"`

Expand Down Expand Up @@ -79,37 +79,22 @@ func NewConfig() *Config {
c.Monitor = monitor.NewConfig()
c.Subscriber = subscriber.NewConfig()
c.HTTPD = httpd.NewConfig()
c.Collectd = collectd.NewConfig()
c.OpenTSDB = opentsdb.NewConfig()

c.GraphiteInputs = []graphite.Config{graphite.NewConfig()}
c.CollectdInputs = []collectd.Config{collectd.NewConfig()}
c.OpenTSDBInputs = []opentsdb.Config{opentsdb.NewConfig()}
c.UDPInputs = []udp.Config{udp.NewConfig()}

c.ContinuousQuery = continuous_querier.NewConfig()
c.Retention = retention.NewConfig()
c.BindAddress = DefaultBindAddress

// All ARRAY attributes have to be init after toml decode
// See: https://github.com/BurntSushi/toml/pull/68
// Those attributes will be initialized in Config.InitTableAttrs method
// Concerned Attributes:
// * `c.Graphites`
// * `c.UDPs`

return c
}

// InitTableAttrs initialises all ARRAY attributes if empty
func (c *Config) InitTableAttrs() {
if len(c.UDPs) == 0 {
c.UDPs = []udp.Config{udp.NewConfig()}
}
if len(c.Graphites) == 0 {
c.Graphites = []graphite.Config{graphite.NewConfig()}
}
}

// NewDemoConfig returns the config that runs when no config is specified.
func NewDemoConfig() (*Config, error) {
c := NewConfig()
c.InitTableAttrs()

var homeDir string
// By default, store meta and data files in current users home directory
Expand Down Expand Up @@ -141,7 +126,7 @@ func (c *Config) Validate() error {
return err
}

for _, g := range c.Graphites {
for _, g := range c.GraphiteInputs {
if err := g.Validate(); err != nil {
return fmt.Errorf("invalid graphite config: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion cmd/influxd/run/config_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (cmd *PrintConfigCommand) parseConfig(path string) (*Config, error) {
if _, err := toml.DecodeFile(path, &config); err != nil {
return nil, err
}
config.InitTableAttrs()
return config, nil
}

Expand Down
77 changes: 57 additions & 20 deletions cmd/influxd/run/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,21 @@ protocol = "udp"
[[graphite]]
protocol = "tcp"

[collectd]
[[collectd]]
bind-address = ":1000"

[opentsdb]
[[collectd]]
bind-address = ":1010"

[[opentsdb]]
bind-address = ":2000"

[[opentsdb]]
bind-address = ":2010"

[[opentsdb]]
bind-address = ":2020"

[[udp]]
bind-address = ":4444"

Expand All @@ -65,18 +74,24 @@ enabled = true
t.Fatalf("unexpected admin bind address: %s", c.Admin.BindAddress)
} else if c.HTTPD.BindAddress != ":8087" {
t.Fatalf("unexpected api bind address: %s", c.HTTPD.BindAddress)
} else if len(c.Graphites) != 2 {
t.Fatalf("unexpected graphites count: %d", len(c.Graphites))
} else if c.Graphites[0].Protocol != "udp" {
t.Fatalf("unexpected graphite protocol(0): %s", c.Graphites[0].Protocol)
} else if c.Graphites[1].Protocol != "tcp" {
t.Fatalf("unexpected graphite protocol(1): %s", c.Graphites[1].Protocol)
} else if c.Collectd.BindAddress != ":1000" {
t.Fatalf("unexpected collectd bind address: %s", c.Collectd.BindAddress)
} else if c.OpenTSDB.BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress)
} else if c.UDPs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPs[0].BindAddress)
} else if len(c.GraphiteInputs) != 2 {
t.Fatalf("unexpected graphiteInputs count: %d", len(c.GraphiteInputs))
} else if c.GraphiteInputs[0].Protocol != "udp" {
t.Fatalf("unexpected graphite protocol(0): %s", c.GraphiteInputs[0].Protocol)
} else if c.GraphiteInputs[1].Protocol != "tcp" {
t.Fatalf("unexpected graphite protocol(1): %s", c.GraphiteInputs[1].Protocol)
} else if c.CollectdInputs[0].BindAddress != ":1000" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[0].BindAddress)
} else if c.CollectdInputs[1].BindAddress != ":1010" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress)
} else if c.OpenTSDBInputs[0].BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress)
} else if c.OpenTSDBInputs[1].BindAddress != ":2010" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[1].BindAddress)
} else if c.OpenTSDBInputs[2].BindAddress != ":2020" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[2].BindAddress)
} else if c.UDPInputs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPInputs[0].BindAddress)
} else if c.Subscriber.Enabled != true {
t.Fatalf("unexpected subscriber enabled: %v", c.Subscriber.Enabled)
} else if c.ContinuousQuery.Enabled != true {
Expand Down Expand Up @@ -111,12 +126,18 @@ protocol = "udp"
[[graphite]]
protocol = "tcp"

[collectd]
[[collectd]]
bind-address = ":1000"

[opentsdb]
[[collectd]]
bind-address = ":1010"

[[opentsdb]]
bind-address = ":2000"

[[opentsdb]]
bind-address = ":2010"

[[udp]]
bind-address = ":4444"

Expand All @@ -137,16 +158,32 @@ enabled = true
t.Fatalf("failed to set env var: %v", err)
}

if err := os.Setenv("INFLUXDB_COLLECTD_1_BIND_ADDRESS", ":1020"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}

if err := os.Setenv("INFLUXDB_OPENTSDB_0_BIND_ADDRESS", ":2020"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}

if err := c.ApplyEnvOverrides(); err != nil {
t.Fatalf("failed to apply env overrides: %v", err)
}

if c.UDPs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPs[0].BindAddress)
if c.UDPInputs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPInputs[0].BindAddress)
}

if c.GraphiteInputs[1].Protocol != "udp" {
t.Fatalf("unexpected graphite protocol: %s", c.GraphiteInputs[1].Protocol)
}

if c.CollectdInputs[1].BindAddress != ":1020" {
t.Fatalf("unexpected collectd bind address: %s", c.CollectdInputs[1].BindAddress)
}

if c.Graphites[1].Protocol != "udp" {
t.Fatalf("unexpected graphite protocol(0): %s", c.Graphites[0].Protocol)
if c.OpenTSDBInputs[0].BindAddress != ":2020" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDBInputs[0].BindAddress)
}
}

Expand Down
20 changes: 12 additions & 8 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,23 @@ func (s *Server) Open() error {
s.appendAdminService(s.config.Admin)
s.appendContinuousQueryService(s.config.ContinuousQuery)
s.appendHTTPDService(s.config.HTTPD)
s.appendCollectdService(s.config.Collectd)
if err := s.appendOpenTSDBService(s.config.OpenTSDB); err != nil {
return err
s.appendRetentionPolicyService(s.config.Retention)
for _, i := range s.config.GraphiteInputs {
if err := s.appendGraphiteService(i); err != nil {
return err
}
}
for _, g := range s.config.UDPs {
s.appendUDPService(g)
for _, i := range s.config.CollectdInputs {
s.appendCollectdService(i)
}
s.appendRetentionPolicyService(s.config.Retention)
for _, g := range s.config.Graphites {
if err := s.appendGraphiteService(g); err != nil {
for _, i := range s.config.OpenTSDBInputs {
if err := s.appendOpenTSDBService(i); err != nil {
return err
}
}
for _, i := range s.config.UDPInputs {
s.appendUDPService(i)
}

s.Subscriber.MetaClient = s.MetaClient
s.Subscriber.MetaClient = s.MetaClient
Expand Down
8 changes: 4 additions & 4 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ reporting-disabled = false
###
### [collectd]
###
### Controls the listener for collectd data.
### Controls one or many listeners for collectd data.
###

[collectd]
[[collectd]]
enabled = false
# bind-address = ""
# database = ""
Expand All @@ -233,10 +233,10 @@ reporting-disabled = false
###
### [opentsdb]
###
### Controls the listener for OpenTSDB data.
### Controls one or many listeners for OpenTSDB data.
###

[opentsdb]
[[opentsdb]]
enabled = false
# bind-address = ":4242"
# database = "opentsdb"
Expand Down
2 changes: 1 addition & 1 deletion services/collectd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Please note that UDP packets larger than the standard size of 1452 are dropped a
## Config Example

```
[collectd]
[[collectd]]
enabled = true
bind-address = ":25826" # the bind address
database = "collectd" # Name of the database that will be written to
Expand Down
32 changes: 32 additions & 0 deletions services/collectd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,35 @@ func NewConfig() Config {
TypesDB: DefaultTypesDB,
}
}

// WithDefaults takes the given config and returns a new config with any required
// default values set.
func (c *Config) WithDefaults() *Config {
d := *c
if d.BindAddress == "" {
d.BindAddress = DefaultBindAddress
}
if d.Database == "" {
d.Database = DefaultDatabase
}
if d.RetentionPolicy == "" {
d.RetentionPolicy = DefaultRetentionPolicy
}
if d.BatchSize == 0 {
d.BatchSize = DefaultBatchSize
}
if d.BatchPending == 0 {
d.BatchPending = DefaultBatchPending
}
if d.BatchDuration == 0 {
d.BatchDuration = DefaultBatchDuration
}
if d.ReadBuffer == 0 {
d.ReadBuffer = DefaultReadBuffer
}
if d.TypesDB == "" {
d.TypesDB = DefaultTypesDB
}

return &d
}
8 changes: 5 additions & 3 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ type Service struct {

// NewService returns a new instance of the collectd service.
func NewService(c Config) *Service {
s := &Service{
Config: &c,
s := Service{
// Use defaults where necessary.
Config: c.WithDefaults(),

Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error),
}

return s
return &s
}

// Open starts the service.
Expand Down
8 changes: 4 additions & 4 deletions services/opentsdb/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
openTSDB Input
OpenTSDB Input
============
InfluxDB supports both the telnet and HTTP openTSDB protocol. This means that InfluxDB can act as a drop-in replacement for your openTSDB system.
InfluxDB supports both the telnet and HTTP OpenTSDB protocol. This means that InfluxDB can act as a drop-in replacement for your OpenTSDB system.

## Configuration
The openTSDB input allows the binding address, target database, and target retention policy within that database, to be set. If the database does not exist, it will be created automatically when the input is initialized. If you also decide to configure retention policy (without configuration the input will use the auto-created default retention policy), both the database and retention policy must already exist.
The OpenTSDB inputs allow the binding address, target database, and target retention policy within that database, to be set. If the database does not exist, it will be created automatically when the input is initialized. If you also decide to configure retention policy (without configuration the input will use the auto-created default retention policy), both the database and retention policy must already exist.

The write-consistency-level can also be set. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is `ONE`.

The openTSDB input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default _batch size_ is 1000, _pending batch_ factor is 5, with a _batch timeout_ of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches.
The OpenTSDB input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default _batch size_ is 1000, _pending batch_ factor is 5, with a _batch timeout_ of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches.
Loading