diff --git a/CHANGELOG.md b/CHANGELOG.md index 0de67bf174..d6d2ef486c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,29 @@ Or for a stream task with use a query directly: kapacitor replay-live query -task cpu_alert -query 'SELECT usage_idle FROM telegraf."default".cpu WHERE time > now() - 10h' ``` +#### HTTP based subscriptions + +Now InfluxDB and Kapacitor support HTTP/S based subscriptions. +This means that Kapacitor need only listen on a single port for the HTTP service, greatly simplifying configuration and setup. + +In order to start using HTTP subscriptions change the `subscription-protocol` option for your configured InfluxDB clusters. + +For example: + +``` +[[influxdb]] + enabled = true + urls = ["http://localhost:8086",] + subscription-protocol = "http" + # or to use https + #subscription-protocol = "https" +``` + +On startup Kapacitor will detect the change and recreate the subscriptions in InfluxDB to use the HTTP protocol. + +>NOTE: While HTTP itself is a TCP transport such that packet loss shouldn't be an issue, if Kapacitor starts to slow down for whatever reason, InfluxDB will drop the subscription writes to Kapacitor. +In order to know if subscription writes are being dropped you should monitor the measurement `_internal.monitor.subscriber` for the field `writeFailures`. + ### Features @@ -95,6 +118,7 @@ kapacitor replay-live query -task cpu_alert -query 'SELECT usage_idle FROM teleg - [#82](https://github.com/influxdata/kapacitor/issues/82): Multiple services for PagerDuty alert. - [#558](https://github.com/influxdata/kapacitor/pull/558): Preserve fields as well as tags on selector InfluxQL functions. - [#259](https://github.com/influxdata/kapacitor/issues/259): Template Tasks have been added. +- [#562](https://github.com/influxdata/kapacitor/pull/562): HTTP based subscriptions. ### Bugfixes diff --git a/cmd/kapacitord/run/config.go b/cmd/kapacitord/run/config.go index ff0238d9da..cf2c26881b 100644 --- a/cmd/kapacitord/run/config.go +++ b/cmd/kapacitord/run/config.go @@ -157,6 +157,10 @@ func (c *Config) Validate() error { if err != nil { return err } + err = c.HTTP.Validate() + if err != nil { + return err + } err = c.Task.Validate() if err != nil { return err diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index 7aeb95a46e..b121996404 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -124,12 +124,18 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (* return nil, err } + // Determine HTTP port + httpPort, err := c.HTTP.Port() + if err != nil { + return nil, err + } + // Append Kapacitor services. s.appendUDFService(c.UDF) s.appendDeadmanService(c.Deadman) s.appendSMTPService(c.SMTP) s.initHTTPDService(c.HTTP) - s.appendInfluxDBService(c.InfluxDB, c.defaultInfluxDB, c.Hostname) + s.appendInfluxDBService(c.InfluxDB, c.defaultInfluxDB, httpPort, c.Hostname) s.appendStorageService(c.Storage) s.appendTaskStoreService(c.Task) s.appendReplayService(c.Replay) @@ -185,10 +191,10 @@ func (s *Server) appendSMTPService(c smtp.Config) { } } -func (s *Server) appendInfluxDBService(c []influxdb.Config, defaultInfluxDB int, hostname string) { +func (s *Server) appendInfluxDBService(c []influxdb.Config, defaultInfluxDB, httpPort int, hostname string) { if len(c) > 0 { l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags) - srv := influxdb.NewService(c, defaultInfluxDB, hostname, l) + srv := influxdb.NewService(c, defaultInfluxDB, httpPort, hostname, l) srv.PointsWriter = s.TaskMaster srv.LogService = s.LogService diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index b924d73363..637a0461c5 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -83,6 +83,12 @@ data_dir = "/var/lib/kapacitor" # This is insecure. insecure-skip-verify = false + # Host part of a bind addres for UDP listeners. + # For example if a UDP listener is using port 1234 + # and `udp-bind = "hostname_or_ip"`, + # then the UDP port will be bound to `hostname_or_ip:1234` + # The default empty value will bind to all addresses. + udp-bind = "" # Subscriptions use the UDP network protocl. # The following options of for the created UDP listeners for each subscription. # Number of packets to buffer when reading packets off the socket. diff --git a/services/httpd/config.go b/services/httpd/config.go index 5d01630c55..22a8a88b9c 100644 --- a/services/httpd/config.go +++ b/services/httpd/config.go @@ -1,9 +1,13 @@ package httpd import ( + "fmt" + "net" + "strconv" "time" "github.com/influxdata/influxdb/toml" + "github.com/pkg/errors" ) const ( @@ -34,3 +38,31 @@ func NewConfig() Config { GZIP: true, } } + +func (c Config) Validate() error { + _, port, err := net.SplitHostPort(c.BindAddress) + if err != nil { + return errors.Wrapf(err, "invalid http bind address %s", c.BindAddress) + } + if port == "" { + return errors.Wrapf(err, "invalid http bind address, no port specified %s", c.BindAddress) + } + if pn, err := strconv.ParseInt(port, 10, 64); err != nil { + return errors.Wrapf(err, "invalid http bind address port %s", port) + } else if pn > 65535 || pn < 0 { + return fmt.Errorf("invalid http bind address port %d: out of range", pn) + } + + return nil +} + +// Determine HTTP port from BindAddress. +func (c Config) Port() (int, error) { + if err := c.Validate(); err != nil { + return -1, err + } + // Ignore errors since we already validated + _, portStr, _ := net.SplitHostPort(c.BindAddress) + port, _ := strconv.ParseInt(portStr, 10, 64) + return int(port), nil +} diff --git a/services/influxdb/config.go b/services/influxdb/config.go index b22d57cdc6..c788581b94 100644 --- a/services/influxdb/config.go +++ b/services/influxdb/config.go @@ -2,6 +2,7 @@ package influxdb import ( "errors" + "fmt" "net/url" "time" @@ -13,6 +14,8 @@ import ( const ( // Maximum time to try and connect to InfluxDB during startup. DefaultStartUpTimeout = time.Minute * 5 + + DefaultSubscriptionProtocol = "http" ) type Config struct { @@ -33,8 +36,10 @@ type Config struct { Timeout toml.Duration `toml:"timeout"` DisableSubscriptions bool `toml:"disable-subscriptions"` + SubscriptionProtocol string `toml:"subscription-protocol"` Subscriptions map[string][]string `toml:"subscriptions"` ExcludedSubscriptions map[string][]string `toml:"excluded-subscriptions"` + UDPBind string `toml:"udp-bind"` UDPBuffer int `toml:"udp-buffer"` UDPReadBuffer int `toml:"udp-read-buffer"` StartUpTimeout toml.Duration `toml:"startup-timeout"` @@ -52,8 +57,9 @@ func NewConfig() Config { ExcludedSubscriptions: map[string][]string{ stats.DefaultDatabse: []string{stats.DefaultRetentionPolicy}, }, - UDPBuffer: udp.DefaultBuffer, - StartUpTimeout: toml.Duration(DefaultStartUpTimeout), + UDPBuffer: udp.DefaultBuffer, + StartUpTimeout: toml.Duration(DefaultStartUpTimeout), + SubscriptionProtocol: DefaultSubscriptionProtocol, } } @@ -74,5 +80,10 @@ func (c Config) Validate() error { return err } } + switch c.SubscriptionProtocol { + case "http", "https", "udp": + default: + return fmt.Errorf("invalid subscription protocol, must be one of 'udp', 'http' or 'https', got %s", c.SubscriptionProtocol) + } return nil } diff --git a/services/influxdb/service.go b/services/influxdb/service.go index e463d3075a..e48337ac16 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -10,6 +10,7 @@ import ( "log" "net" "net/url" + "strconv" "strings" "time" @@ -40,7 +41,7 @@ type Service struct { logger *log.Logger } -func NewService(configs []Config, defaultInfluxDB int, hostname string, l *log.Logger) *Service { +func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string, l *log.Logger) *Service { clusterID := kapacitor.ClusterIDVar.StringValue() subName := subNamePrefix + clusterID clusters := make(map[string]*influxdb, len(configs)) @@ -84,13 +85,16 @@ func NewService(configs []Config, defaultInfluxDB int, hostname string, l *log.L configSubs: subs, exConfigSubs: exSubs, hostname: hostname, + httpPort: httpPort, logger: l, + udpBind: c.UDPBind, udpBuffer: c.UDPBuffer, udpReadBuffer: c.UDPReadBuffer, startupTimeout: time.Duration(c.StartUpTimeout), clusterID: clusterID, subName: subName, disableSubs: c.DisableSubscriptions, + protocol: c.SubscriptionProtocol, } if defaultInfluxDB == i { defaultInfluxDBName = c.Name @@ -145,7 +149,10 @@ type influxdb struct { configSubs map[subEntry]bool exConfigSubs map[subEntry]bool hostname string + httpPort int logger *log.Logger + protocol string + udpBind string udpBuffer int udpReadBuffer int startupTimeout time.Duration @@ -329,7 +336,34 @@ func (s *influxdb) linkSubscriptions() error { } existingSubs[se] = si } else if se.name == s.subName { - existingSubs[se] = si + if len(si.Destinations) == 0 { + s.logger.Println("E! found subscription without any destinations:", se) + continue + } + u, err := url.Parse(si.Destinations[0]) + if err != nil { + s.logger.Println("E! found subscription with invalid destinations:", si) + continue + } + host, port, err := net.SplitHostPort(u.Host) + if err != nil { + s.logger.Println("E! found subscription with invalid destinations:", si) + continue + } + pn, err := strconv.ParseInt(port, 10, 64) + if err != nil { + s.logger.Println("E! found subscription with invalid destinations:", si) + continue + } + // Check if the hostname, port or protocol have changed + if host != s.hostname || + u.Scheme != s.protocol || + (u.Scheme == "http" && int(pn) != s.httpPort) { + // Something changed, drop the sub and let it get recreated + s.dropSub(cli, se.name, se.cluster, se.rp) + } else { + existingSubs[se] = si + } } } } @@ -347,12 +381,14 @@ func (s *influxdb) linkSubscriptions() error { s.logger.Println("E! invalid URL in subscription destinations:", err) continue } - pair := strings.Split(u.Host, ":") - if pair[0] == s.hostname { + host, port, err := net.SplitHostPort(u.Host) + if host == s.hostname { numSubscriptions++ - _, err := s.startListener(se.cluster, se.rp, *u) - if err != nil { - s.logger.Println("E! failed to start listener:", err) + if u.Scheme == "udp" { + _, err := s.startUDPListener(se.cluster, se.rp, port) + if err != nil { + s.logger.Println("E! failed to start UDP listener:", err) + } } startedSubs[se] = true break @@ -364,19 +400,19 @@ func (s *influxdb) linkSubscriptions() error { for _, se := range allSubs { // If we have been configured to subscribe and the subscription is not started yet. if (s.configSubs[se] || all) && !startedSubs[se] && !s.exConfigSubs[se] { - u, err := url.Parse("udp://:0") - if err != nil { - return fmt.Errorf("could not create valid destination url, is hostname correct? err: %s", err) + var destination string + switch s.protocol { + case "http", "https": + destination = fmt.Sprintf("%s://%s:%d", s.protocol, s.hostname, s.httpPort) + case "udp": + addr, err := s.startUDPListener(se.cluster, se.rp, "0") + if err != nil { + s.logger.Println("E! failed to start UDP listener:", err) + } + destination = fmt.Sprintf("udp://%s:%d", s.hostname, addr.Port) } numSubscriptions++ - addr, err := s.startListener(se.cluster, se.rp, *u) - if err != nil { - s.logger.Println("E! failed to start listener:", err) - } - - // Get port from addr - destination := fmt.Sprintf("udp://%s:%d", s.hostname, addr.Port) err = s.createSub(cli, se.name, se.cluster, se.rp, "ANY", []string{destination}) if err != nil { @@ -425,29 +461,25 @@ func (s *influxdb) dropSub(cli client.Client, name, cluster, rp string) (err err return } -func (s *influxdb) startListener(cluster, rp string, u url.URL) (*net.UDPAddr, error) { - switch u.Scheme { - case "udp": - c := udp.Config{} - c.Enabled = true - c.BindAddress = u.Host - c.Database = cluster - c.RetentionPolicy = rp - c.Buffer = s.udpBuffer - c.ReadBuffer = s.udpReadBuffer - - l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", cluster, rp), log.LstdFlags) - service := udp.NewService(c, l) - service.PointsWriter = s.PointsWriter - err := service.Open() - if err != nil { - return nil, err - } - s.services = append(s.services, service) - s.logger.Println("I! started UDP listener for", cluster, rp) - return service.Addr(), nil +func (s *influxdb) startUDPListener(cluster, rp, port string) (*net.UDPAddr, error) { + c := udp.Config{} + c.Enabled = true + c.BindAddress = fmt.Sprintf("%s:%s", s.udpBind, port) + c.Database = cluster + c.RetentionPolicy = rp + c.Buffer = s.udpBuffer + c.ReadBuffer = s.udpReadBuffer + + l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", cluster, rp), log.LstdFlags) + service := udp.NewService(c, l) + service.PointsWriter = s.PointsWriter + err := service.Open() + if err != nil { + return nil, err } - return nil, fmt.Errorf("unsupported scheme %q", u.Scheme) + s.services = append(s.services, service) + s.logger.Println("I! started UDP listener for", cluster, rp) + return service.Addr(), nil } func (s *influxdb) execQuery(cli client.Client, q string) (*client.Response, error) {