Skip to content

Commit

Permalink
Merge pull request #12 from theplant/monitoring-upgrade-influxdb-client
Browse files Browse the repository at this point in the history
Upgrade influxdb client to v2
  • Loading branch information
junhuif authored May 22, 2018
2 parents a8b4e2e + 0492c71 commit fd16481
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# PR [#12](https://github.com/theplant/appkit/pull/12)

* Upgrade [`influxdb.Client`](https://github.com/influxdata/influxdb/tree/master/client#description) to `v2` version to fix the monitoring/influxdb monitor broken [issue](https://github.com/theplant/appkit/issues/11) due to the old version of `influxdb.Client` being deprecated.

# PR [#9](https://github.com/theplant/appkit/pull/9)

* Fix `contexts.HTTPStatus` to assume that the response is `http.StatusOK`, rather than `0`.
Expand Down
83 changes: 59 additions & 24 deletions monitoring/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"strings"
"time"

influxdb "github.com/influxdata/influxdb/client"
"github.com/pkg/errors"
"github.com/theplant/appkit/log"

influxdb "github.com/influxdata/influxdb/client/v2"
)

// InfluxMonitorConfig type for configuration of Monitor that sinks to
Expand All @@ -32,20 +33,42 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
return nil, errors.Errorf("influxdb monitoring url %v not absolute url", monitorURL)
}

// NewClient always returns a nil error
client, _ := influxdb.NewClient(influxdb.Config{
URL: *u,
})
username := ""
password := ""

if u.User != nil {
username = u.User.Username()
// Skips identify of "whether password is set" as password not a must
password, _ = u.User.Password()
}

httpConfig := influxdb.HTTPConfig{
Addr: fmt.Sprintf("%s://%s", u.Scheme, u.Host),
Username: username,
Password: password,
}

client, err := influxdb.NewHTTPClient(httpConfig)

if err != nil {
return nil, errors.Wrapf(err, "couldn't initialize influxdb http client with http config %+v", httpConfig)
}

database := strings.TrimLeft(u.Path, "/")

if strings.TrimSpace(database) == "" {
return nil, errors.Errorf("influxdb monitoring url %v not database", monitorURL)
}

monitor := influxdbMonitor{
database: strings.TrimLeft(u.Path, "/"),
database: database,
client: client,
logger: logger,
}

logger = logger.With(
"scheme", u.Scheme,
"username", u.User.Username(),
"username", username,
"database", monitor.database,
"host", u.Host,
)
Expand All @@ -56,7 +79,7 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,

for {
// Ignore duration, version
_, _, err = client.Ping()
_, _, err = client.Ping(5 * time.Second)
if err != nil {
logger.Warn().Log(
"err", err,
Expand All @@ -70,7 +93,7 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
}()

logger.Info().Log(
"msg", fmt.Sprintf("influxdb instrumentation writing to %s://%s@%s/%s", u.Scheme, u.User.Username(), u.Host, monitor.database),
"msg", fmt.Sprintf("influxdb instrumentation writing to %s://%s@%s/%s", u.Scheme, username, u.Host, monitor.database),
)

return &monitor, nil
Expand All @@ -79,7 +102,7 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
// InfluxdbMonitor implements monitor.Monitor interface, it wraps
// the influxdb client configuration.
type influxdbMonitor struct {
client *influxdb.Client
client influxdb.Client
database string
logger log.Logger
}
Expand All @@ -92,26 +115,38 @@ func (im influxdbMonitor) InsertRecord(measurement string, value interface{}, ta

fields["value"] = value

// Ignore response, we only care about write errors
_, err := im.client.Write(influxdb.BatchPoints{
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: im.database,
Points: []influxdb.Point{
{
Measurement: measurement,
Fields: fields,
Tags: tags,
Time: at,
},
},
})

l := im.logger.With("database", im.database,
"measurement", measurement,
"value", value,
"tags", tags)

if err != nil {
l.Error().Log(
"err", err,
"during", "influxdb.NewBatchPoints",
"msg", fmt.Sprintf("Error initializing batch points for %s: %v", measurement, err),
)
}

pt, err := influxdb.NewPoint(measurement, tags, fields, at)

if err != nil {
l.Error().Log(
"err", err,
"during", "influxdb.NewPoint",
"msg", fmt.Sprintf("Error initializing a point for %s: %v", measurement, err),
)
}

bp.AddPoint(pt)

if err := im.client.Write(bp); err != nil {
im.logger.Error().Log(
"err", err,
"database", im.database,
"measurement", measurement,
"value", value,
"tags", tags,
"during", "influxdb.Client.Write",
"msg", fmt.Sprintf("Error inserting record into %s: %v", measurement, err),
)
Expand Down
44 changes: 44 additions & 0 deletions monitoring/influxdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package monitoring

import (
"strings"
"testing"

"github.com/theplant/appkit/log"
)

func TestInvalidInfluxdbConfig(t *testing.T) {
logger := log.NewNopLogger()
cases := map[string]string{
"not absolute url": "",
"Unsupported protocol scheme": "localhost:8086/local",
"not database": "http://root:password@localhost:8086",
}

for reason, config := range cases {
_, err := NewInfluxdbMonitor(InfluxMonitorConfig(config), logger)

if err == nil || !strings.Contains(err.Error(), reason) {
t.Fatalf("no error creating influxdb monitor with config url %s", config)
}
}
}

func TestValidInfluxdbConfig(t *testing.T) {
logger := log.NewNopLogger()
cases := []string{
"http://localhost:8086/local",
"https://localhost:8086/local",
"https://root@localhost:8086/local",
"https://:password@localhost:8086/local",
"https://root:password@localhost:8086/local",
}

for _, config := range cases {
_, err := NewInfluxdbMonitor(InfluxMonitorConfig(config), logger)

if err != nil {
t.Fatalf("error creating influxdb monitor with config url %s", config)
}
}
}

0 comments on commit fd16481

Please sign in to comment.