Skip to content

Commit

Permalink
Support HTTP based subscriptions
Browse files Browse the repository at this point in the history
UDP bind address configurable
HTTP port is determined from [http] bind address config
  • Loading branch information
nathanielc committed May 31, 2016
1 parent dc2c9f4 commit bee75e2
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 44 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,29 @@ Or for a stream task with use a query directly:
kapacitor replay-live query -task cpu_alert -query 'SELECT usage_idle FROM telegraf."default".cpu WHERE time > now() - 10h'
```

#### HTTP based subscriptions

Now InfluxDB and Kapacitor support HTTP/S based subscriptions.
This means that Kapacitor need only listen on a single port for the HTTP service, greatly simplifying configuration and setup.

In order to start using HTTP subscriptions change the `subscription-protocol` option for your configured InfluxDB clusters.

For example:

```
[[influxdb]]
enabled = true
urls = ["http://localhost:8086",]
subscription-protocol = "http"
# or to use https
#subscription-protocol = "https"
```

On startup Kapacitor will detect the change and recreate the subscriptions in InfluxDB to use the HTTP protocol.

>NOTE: While HTTP itself is a TCP transport such that packet loss shouldn't be an issue, if Kapacitor starts to slow down for whatever reason, InfluxDB will drop the subscription writes to Kapacitor.
In order to know if subscription writes are being dropped you should monitor the measurement `_internal.monitor.subscriber` for the field `writeFailures`.


### Features

Expand All @@ -95,6 +118,7 @@ kapacitor replay-live query -task cpu_alert -query 'SELECT usage_idle FROM teleg
- [#82](https://github.com/influxdata/kapacitor/issues/82): Multiple services for PagerDuty alert.
- [#558](https://github.com/influxdata/kapacitor/pull/558): Preserve fields as well as tags on selector InfluxQL functions.
- [#259](https://github.com/influxdata/kapacitor/issues/259): Template Tasks have been added.
- [#562](https://github.com/influxdata/kapacitor/pull/562): HTTP based subscriptions.


### Bugfixes
Expand Down
4 changes: 4 additions & 0 deletions cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (c *Config) Validate() error {
if err != nil {
return err
}
err = c.HTTP.Validate()
if err != nil {
return err
}
err = c.Task.Validate()
if err != nil {
return err
Expand Down
12 changes: 9 additions & 3 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,18 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
return nil, err
}

// Determine HTTP port
httpPort, err := c.HTTP.Port()
if err != nil {
return nil, err
}

// Append Kapacitor services.
s.appendUDFService(c.UDF)
s.appendDeadmanService(c.Deadman)
s.appendSMTPService(c.SMTP)
s.initHTTPDService(c.HTTP)
s.appendInfluxDBService(c.InfluxDB, c.defaultInfluxDB, c.Hostname)
s.appendInfluxDBService(c.InfluxDB, c.defaultInfluxDB, httpPort, c.Hostname)
s.appendStorageService(c.Storage)
s.appendTaskStoreService(c.Task)
s.appendReplayService(c.Replay)
Expand Down Expand Up @@ -185,10 +191,10 @@ func (s *Server) appendSMTPService(c smtp.Config) {
}
}

func (s *Server) appendInfluxDBService(c []influxdb.Config, defaultInfluxDB int, hostname string) {
func (s *Server) appendInfluxDBService(c []influxdb.Config, defaultInfluxDB, httpPort int, hostname string) {
if len(c) > 0 {
l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags)
srv := influxdb.NewService(c, defaultInfluxDB, hostname, l)
srv := influxdb.NewService(c, defaultInfluxDB, httpPort, hostname, l)
srv.PointsWriter = s.TaskMaster
srv.LogService = s.LogService

Expand Down
6 changes: 6 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ data_dir = "/var/lib/kapacitor"
# This is insecure.
insecure-skip-verify = false

# Host part of a bind addres for UDP listeners.
# For example if a UDP listener is using port 1234
# and `udp-bind = "hostname_or_ip"`,
# then the UDP port will be bound to `hostname_or_ip:1234`
# The default empty value will bind to all addresses.
udp-bind = ""
# Subscriptions use the UDP network protocl.
# The following options of for the created UDP listeners for each subscription.
# Number of packets to buffer when reading packets off the socket.
Expand Down
32 changes: 32 additions & 0 deletions services/httpd/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package httpd

import (
"fmt"
"net"
"strconv"
"time"

"github.com/influxdata/influxdb/toml"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -34,3 +38,31 @@ func NewConfig() Config {
GZIP: true,
}
}

func (c Config) Validate() error {
_, port, err := net.SplitHostPort(c.BindAddress)
if err != nil {
return errors.Wrapf(err, "invalid http bind address %s", c.BindAddress)
}
if port == "" {
return errors.Wrapf(err, "invalid http bind address, no port specified %s", c.BindAddress)
}
if pn, err := strconv.ParseInt(port, 10, 64); err != nil {
return errors.Wrapf(err, "invalid http bind address port %s", port)
} else if pn > 65535 || pn < 0 {
return fmt.Errorf("invalid http bind address port %d: out of range", pn)
}

return nil
}

// Determine HTTP port from BindAddress.
func (c Config) Port() (int, error) {
if err := c.Validate(); err != nil {
return -1, err
}
// Ignore errors since we already validated
_, portStr, _ := net.SplitHostPort(c.BindAddress)
port, _ := strconv.ParseInt(portStr, 10, 64)
return int(port), nil
}
15 changes: 13 additions & 2 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package influxdb

import (
"errors"
"fmt"
"net/url"
"time"

Expand All @@ -13,6 +14,8 @@ import (
const (
// Maximum time to try and connect to InfluxDB during startup.
DefaultStartUpTimeout = time.Minute * 5

DefaultSubscriptionProtocol = "http"
)

type Config struct {
Expand All @@ -33,8 +36,10 @@ type Config struct {

Timeout toml.Duration `toml:"timeout"`
DisableSubscriptions bool `toml:"disable-subscriptions"`
SubscriptionProtocol string `toml:"subscription-protocol"`
Subscriptions map[string][]string `toml:"subscriptions"`
ExcludedSubscriptions map[string][]string `toml:"excluded-subscriptions"`
UDPBind string `toml:"udp-bind"`
UDPBuffer int `toml:"udp-buffer"`
UDPReadBuffer int `toml:"udp-read-buffer"`
StartUpTimeout toml.Duration `toml:"startup-timeout"`
Expand All @@ -52,8 +57,9 @@ func NewConfig() Config {
ExcludedSubscriptions: map[string][]string{
stats.DefaultDatabse: []string{stats.DefaultRetentionPolicy},
},
UDPBuffer: udp.DefaultBuffer,
StartUpTimeout: toml.Duration(DefaultStartUpTimeout),
UDPBuffer: udp.DefaultBuffer,
StartUpTimeout: toml.Duration(DefaultStartUpTimeout),
SubscriptionProtocol: DefaultSubscriptionProtocol,
}
}

Expand All @@ -74,5 +80,10 @@ func (c Config) Validate() error {
return err
}
}
switch c.SubscriptionProtocol {
case "http", "https", "udp":
default:
return fmt.Errorf("invalid subscription protocol, must be one of 'udp', 'http' or 'https', got %s", c.SubscriptionProtocol)
}
return nil
}
110 changes: 71 additions & 39 deletions services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log"
"net"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -40,7 +41,7 @@ type Service struct {
logger *log.Logger
}

func NewService(configs []Config, defaultInfluxDB int, hostname string, l *log.Logger) *Service {
func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string, l *log.Logger) *Service {
clusterID := kapacitor.ClusterIDVar.StringValue()
subName := subNamePrefix + clusterID
clusters := make(map[string]*influxdb, len(configs))
Expand Down Expand Up @@ -84,13 +85,16 @@ func NewService(configs []Config, defaultInfluxDB int, hostname string, l *log.L
configSubs: subs,
exConfigSubs: exSubs,
hostname: hostname,
httpPort: httpPort,
logger: l,
udpBind: c.UDPBind,
udpBuffer: c.UDPBuffer,
udpReadBuffer: c.UDPReadBuffer,
startupTimeout: time.Duration(c.StartUpTimeout),
clusterID: clusterID,
subName: subName,
disableSubs: c.DisableSubscriptions,
protocol: c.SubscriptionProtocol,
}
if defaultInfluxDB == i {
defaultInfluxDBName = c.Name
Expand Down Expand Up @@ -145,7 +149,10 @@ type influxdb struct {
configSubs map[subEntry]bool
exConfigSubs map[subEntry]bool
hostname string
httpPort int
logger *log.Logger
protocol string
udpBind string
udpBuffer int
udpReadBuffer int
startupTimeout time.Duration
Expand Down Expand Up @@ -329,7 +336,34 @@ func (s *influxdb) linkSubscriptions() error {
}
existingSubs[se] = si
} else if se.name == s.subName {
existingSubs[se] = si
if len(si.Destinations) == 0 {
s.logger.Println("E! found subscription without any destinations:", se)
continue
}
u, err := url.Parse(si.Destinations[0])
if err != nil {
s.logger.Println("E! found subscription with invalid destinations:", si)
continue
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
s.logger.Println("E! found subscription with invalid destinations:", si)
continue
}
pn, err := strconv.ParseInt(port, 10, 64)
if err != nil {
s.logger.Println("E! found subscription with invalid destinations:", si)
continue
}
// Check if the hostname, port or protocol have changed
if host != s.hostname ||
u.Scheme != s.protocol ||
(u.Scheme == "http" && int(pn) != s.httpPort) {
// Something changed, drop the sub and let it get recreated
s.dropSub(cli, se.name, se.cluster, se.rp)
} else {
existingSubs[se] = si
}
}
}
}
Expand All @@ -347,12 +381,14 @@ func (s *influxdb) linkSubscriptions() error {
s.logger.Println("E! invalid URL in subscription destinations:", err)
continue
}
pair := strings.Split(u.Host, ":")
if pair[0] == s.hostname {
host, port, err := net.SplitHostPort(u.Host)
if host == s.hostname {
numSubscriptions++
_, err := s.startListener(se.cluster, se.rp, *u)
if err != nil {
s.logger.Println("E! failed to start listener:", err)
if u.Scheme == "udp" {
_, err := s.startUDPListener(se.cluster, se.rp, port)
if err != nil {
s.logger.Println("E! failed to start UDP listener:", err)
}
}
startedSubs[se] = true
break
Expand All @@ -364,19 +400,19 @@ func (s *influxdb) linkSubscriptions() error {
for _, se := range allSubs {
// If we have been configured to subscribe and the subscription is not started yet.
if (s.configSubs[se] || all) && !startedSubs[se] && !s.exConfigSubs[se] {
u, err := url.Parse("udp://:0")
if err != nil {
return fmt.Errorf("could not create valid destination url, is hostname correct? err: %s", err)
var destination string
switch s.protocol {
case "http", "https":
destination = fmt.Sprintf("%s://%s:%d", s.protocol, s.hostname, s.httpPort)
case "udp":
addr, err := s.startUDPListener(se.cluster, se.rp, "0")
if err != nil {
s.logger.Println("E! failed to start UDP listener:", err)
}
destination = fmt.Sprintf("udp://%s:%d", s.hostname, addr.Port)
}

numSubscriptions++
addr, err := s.startListener(se.cluster, se.rp, *u)
if err != nil {
s.logger.Println("E! failed to start listener:", err)
}

// Get port from addr
destination := fmt.Sprintf("udp://%s:%d", s.hostname, addr.Port)

err = s.createSub(cli, se.name, se.cluster, se.rp, "ANY", []string{destination})
if err != nil {
Expand Down Expand Up @@ -425,29 +461,25 @@ func (s *influxdb) dropSub(cli client.Client, name, cluster, rp string) (err err
return
}

func (s *influxdb) startListener(cluster, rp string, u url.URL) (*net.UDPAddr, error) {
switch u.Scheme {
case "udp":
c := udp.Config{}
c.Enabled = true
c.BindAddress = u.Host
c.Database = cluster
c.RetentionPolicy = rp
c.Buffer = s.udpBuffer
c.ReadBuffer = s.udpReadBuffer

l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", cluster, rp), log.LstdFlags)
service := udp.NewService(c, l)
service.PointsWriter = s.PointsWriter
err := service.Open()
if err != nil {
return nil, err
}
s.services = append(s.services, service)
s.logger.Println("I! started UDP listener for", cluster, rp)
return service.Addr(), nil
func (s *influxdb) startUDPListener(cluster, rp, port string) (*net.UDPAddr, error) {
c := udp.Config{}
c.Enabled = true
c.BindAddress = fmt.Sprintf("%s:%s", s.udpBind, port)
c.Database = cluster
c.RetentionPolicy = rp
c.Buffer = s.udpBuffer
c.ReadBuffer = s.udpReadBuffer

l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", cluster, rp), log.LstdFlags)
service := udp.NewService(c, l)
service.PointsWriter = s.PointsWriter
err := service.Open()
if err != nil {
return nil, err
}
return nil, fmt.Errorf("unsupported scheme %q", u.Scheme)
s.services = append(s.services, service)
s.logger.Println("I! started UDP listener for", cluster, rp)
return service.Addr(), nil
}

func (s *influxdb) execQuery(cli client.Client, q string) (*client.Response, error) {
Expand Down

0 comments on commit bee75e2

Please sign in to comment.