Skip to content

Commit

Permalink
Merge pull request #219 from influxdata/nc-issue#187
Browse files Browse the repository at this point in the history
Retry connecting to InfluxDB on startup
  • Loading branch information
Nathaniel Cook committed Feb 5, 2016
2 parents 99dc918 + a0251e2 commit eed6eea
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ There was a breaking change to the `define` command, see [#173](https://github.c
- [#208](https://github.com/influxdata/kapacitor/issues/208): Add default stats dbrp to default subscription excludes.
- [#203](https://github.com/influxdata/kapacitor/issues/203): Fix hang when deleteing invalid batch task.
- [#182](https://github.com/influxdata/kapacitor/issues/182): Fix missing/incorrect Content-Type headers for various HTTP endpoints.
- [#187](https://github.com/influxdata/kapacitor/issues/187): Retry connecting to InfluxDB on startup for up to 5 minutes by default.

## v0.10.0 [2016-01-26]

Expand Down
12 changes: 10 additions & 2 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@ import (

"github.com/influxdata/kapacitor/services/stats"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdb/influxdb/toml"
)

const (
// Maximum time to try and connect to InfluxDB during startup.
DefaultStartUpTimeout = time.Minute * 5
)

type Config struct {
Enabled bool `toml:"enabled"`
URLs []string `toml:"urls"`
Username string `toml:"username"`
Password string `toml:"password"`
Timeout time.Duration `toml:"timeout"`
Timeout toml.Duration `toml:"timeout"`
Subscriptions map[string][]string `toml:"subscriptions"`
ExcludedSubscriptions map[string][]string `toml:"excluded-subscriptions"`
UDPBuffer int `toml:"udp-buffer"`
UDPReadBuffer int `toml:"udp-read-buffer"`
StartUpTimeout toml.Duration `toml:"startup-timeout"`
}

func NewConfig() Config {
Expand All @@ -30,7 +37,8 @@ func NewConfig() Config {
ExcludedSubscriptions: map[string][]string{
stats.DefaultDatabse: []string{stats.DefaultRetentionPolicy},
},
UDPBuffer: udp.DefaultBuffer,
UDPBuffer: udp.DefaultBuffer,
StartUpTimeout: toml.Duration(DefaultStartUpTimeout),
}
}

Expand Down
54 changes: 36 additions & 18 deletions services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,31 @@ import (
"net"
"net/url"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/cluster"
)

const (
// The name to give to all subscriptions
subName = "kapacitor"
)

// Handles requests to write or read from an InfluxDB cluster
type Service struct {
configs []client.Config
i int
configSubs map[subEntry]bool
exConfigSubs map[subEntry]bool
hostname string
logger *log.Logger
udpBuffer int
udpReadBuffer int
configs []client.Config
i int
configSubs map[subEntry]bool
exConfigSubs map[subEntry]bool
hostname string
logger *log.Logger
udpBuffer int
udpReadBuffer int
startupTimeout time.Duration

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand Down Expand Up @@ -61,7 +65,7 @@ func NewService(c Config, hostname string, l *log.Logger) *Service {
Username: c.Username,
Password: c.Password,
UserAgent: "Kapacitor",
Timeout: c.Timeout,
Timeout: time.Duration(c.Timeout),
}
}
subs := make(map[subEntry]bool, len(c.Subscriptions))
Expand All @@ -79,13 +83,14 @@ func NewService(c Config, hostname string, l *log.Logger) *Service {
}
}
return &Service{
configs: configs,
configSubs: subs,
exConfigSubs: exSubs,
hostname: hostname,
logger: l,
udpBuffer: c.UDPBuffer,
udpReadBuffer: c.UDPReadBuffer,
configs: configs,
configSubs: subs,
exConfigSubs: exSubs,
hostname: hostname,
logger: l,
udpBuffer: c.UDPBuffer,
udpReadBuffer: c.UDPReadBuffer,
startupTimeout: time.Duration(c.StartUpTimeout),
}
}

Expand All @@ -111,7 +116,6 @@ func (s *Service) Addr() string {
}

func (s *Service) NewClient() (c *client.Client, err error) {

tries := 0
for tries < len(s.configs) {
tries++
Expand All @@ -131,7 +135,21 @@ func (s *Service) NewClient() (c *client.Client, err error) {
}

func (s *Service) linkSubscriptions() error {
cli, err := s.NewClient()
s.logger.Println("I! linking subscriptions")
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = s.startupTimeout
ticker := backoff.NewTicker(b)
var err error
var cli *client.Client
for range ticker.C {
cli, err = s.NewClient()
if err != nil {
s.logger.Println("D! failed to connect to InfluxDB, retrying... ", err)
continue
}
ticker.Stop()
break
}
if err != nil {
return err
}
Expand Down

0 comments on commit eed6eea

Please sign in to comment.