Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor batch write #22

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 114 additions & 39 deletions monitoring/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,50 @@ package monitoring

import (
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"

influxdb "github.com/influxdata/influxdb/client/v2"
"github.com/pkg/errors"
"github.com/theplant/appkit/log"

influxdb "github.com/influxdata/influxdb/client/v2"
)

// InfluxMonitorConfig type for configuration of Monitor that sinks to
// InfluxDB
type InfluxMonitorConfig string

type influxMonitorCfg struct {
Addr string
Username string
Password string
Database string
Addr string
Username string
Password string
Database string
BatchWriteInterval time.Duration
}

var configRegexp = regexp.MustCompile(`^(?P<scheme>https?):\/\/(?:(?P<username>.*?)(?::(?P<password>.*?)|)@)?(?P<host>.+?)\/(?P<database>.+?)$`)
const (
defaultBatchWriteInterval = time.Minute
)

var (
configRegexp = regexp.MustCompile(`^(?P<scheme>https?):\/\/(?:(?P<username>.*?)(?::(?P<password>.*?)|)@)?(?P<host>.+?)\/(?P<database>.+?)(?:\?(?P<query>.*?))?$`)
)

func parseConfig(config string) (*influxMonitorCfg, error) {
match := configRegexp.FindStringSubmatch(config)
if match == nil {
return nil, errors.New("config format error")
return nil, errors.New("influxdb config format error")
}

var scheme string
var username string
var password string
var host string
var database string
var query string
for i, name := range configRegexp.SubexpNames() {
switch name {
case "scheme":
Expand All @@ -48,19 +58,45 @@ func parseConfig(config string) (*influxMonitorCfg, error) {
host = match[i]
case "database":
database = match[i]
case "query":
query = match[i]
}
}

var batchWriteInterval time.Duration
if query != "" {
values, err := url.ParseQuery(query)
if err != nil {
return nil, errors.Wrap(err, "influxdb config query format error")
}

batchWriteSecondInterval := values.Get("batch-write-second-interval")
if batchWriteSecondInterval != "" {
second, err := strconv.Atoi(batchWriteSecondInterval)
if err != nil {
return nil, errors.Wrap(err, "influxdb config parameter batch-write-second-interval format error")
}

batchWriteInterval = time.Duration(second) * time.Second
}

}
if batchWriteInterval == 0 {
batchWriteInterval = defaultBatchWriteInterval
}

return &influxMonitorCfg{
Addr: scheme + "://" + host,
Username: username,
Password: password,
Database: database,
Addr: scheme + "://" + host,
Username: username,
Password: password,
Database: database,
BatchWriteInterval: batchWriteInterval,
}, nil
}

// NewInfluxdbMonitor creates new monitoring influxdb
// client. config URL syntax is `https://<username>:<password>@<influxDB host>/<database>`
// client. config URL syntax is `https://<username>:<password>@<influxDB host>/<database>?batch-write-second-interval=seconds`
// batch-write-second-interval is optional, default is 60.
//
// Will returns a error if monitorURL is invalid or not absolute.
//
Expand Down Expand Up @@ -90,16 +126,19 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
return nil, errors.Errorf("influxdb monitoring url %v not database", monitorURL)
}

monitor := influxdbMonitor{
monitor := &influxdbMonitor{
database: cfg.Database,
client: client,
logger: logger,

batchWriteInterval: cfg.BatchWriteInterval,
}

logger = logger.With(
"addr", cfg.Addr,
"username", cfg.Username,
"database", monitor.database,
"batch-write-second-interval", int(cfg.BatchWriteInterval/time.Second),
)

// check connectivity to InfluxDB every 5 minutes
Expand All @@ -121,11 +160,13 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
}
}()

go monitor.batchWriteTicker()

logger.Info().Log(
"msg", fmt.Sprintf("influxdb instrumentation writing to %s/%s on user %v", cfg.Addr, monitor.database, cfg.Username),
)

return &monitor, nil
return monitor, nil
}

// InfluxdbMonitor implements monitor.Monitor interface, it wraps
Expand All @@ -134,66 +175,100 @@ type influxdbMonitor struct {
client influxdb.Client
database string
logger log.Logger

batchWriteInterval time.Duration
cachePoints []*influxdb.Point
cachePointsMutex sync.Mutex
}

// InsertRecord part of monitor.Monitor.
func (im influxdbMonitor) InsertRecord(measurement string, value interface{}, tags map[string]string, fields map[string]interface{}, at time.Time) {
if fields == nil {
fields = map[string]interface{}{}
func (im *influxdbMonitor) batchWriteTicker() {
t := time.NewTicker(im.batchWriteInterval)

for {
<-t.C

im.batchWrite()
}
}

fields["value"] = value
func (im *influxdbMonitor) batchWrite() {
if len(im.cachePoints) == 0 {
return
}

bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: im.database,
})
if err != nil {
_ = im.logger.Error().Log(
"database", im.database,
"err", err,
"during", "influxdb.NewBatchPoints",
"msg", fmt.Sprintf("NewBatchPoints failed: %v", err),
)
return
}

l := im.logger.With("database", im.database,
"measurement", measurement,
"value", value,
"tags", tags)
im.cachePointsMutex.Lock()
defer im.cachePointsMutex.Unlock()

bp.AddPoints(im.cachePoints)

err = im.client.Write(bp)
if err != nil {
l.Error().Log(
_ = im.logger.Error().Log(
"database", im.database,
"err", err,
"during", "influxdb.NewBatchPoints",
"msg", fmt.Sprintf("Error initializing batch points for %s: %v", measurement, err),
"during", "influxdb.client.Write",
"msg", fmt.Sprintf("influxdb client write cache points failed: %v", err),
)
return
}

im.cachePoints = nil
}

// InsertRecord part of monitor.Monitor.
func (im *influxdbMonitor) InsertRecord(measurement string, value interface{}, tags map[string]string, fields map[string]interface{}, at time.Time) {
if fields == nil {
fields = map[string]interface{}{}
}

fields["value"] = value

pt, err := influxdb.NewPoint(measurement, tags, fields, at)

if err != nil {
l.Error().Log(
_ = im.logger.Error().Log(
"database", im.database,
"measurement", measurement,
"value", value,
"tags", tags,
"err", err,
"during", "influxdb.NewPoint",
"msg", fmt.Sprintf("Error initializing a point for %s: %v", measurement, err),
)
return
}

bp.AddPoint(pt)
im.cachePointsMutex.Lock()
defer im.cachePointsMutex.Unlock()

if err := im.client.Write(bp); err != nil {
im.logger.Error().Log(
"err", err,
"during", "influxdb.Client.Write",
"msg", fmt.Sprintf("Error inserting record into %s: %v", measurement, err),
)
}
im.cachePoints = append(im.cachePoints, pt)
}

func (im influxdbMonitor) Count(measurement string, value float64, tags map[string]string, fields map[string]interface{}) {
func (im *influxdbMonitor) Count(measurement string, value float64, tags map[string]string, fields map[string]interface{}) {
im.InsertRecord(measurement, value, tags, fields, time.Now())
}

// CountError logs a value in measurement, with the given error's
// message stored in an `error` tag.
func (im influxdbMonitor) CountError(measurement string, value float64, err error) {
func (im *influxdbMonitor) CountError(measurement string, value float64, err error) {
data := map[string]string{"error": err.Error()}
im.Count(measurement, value, data, nil)
}

// CountSimple logs a value in measurement (with no tags).
func (im influxdbMonitor) CountSimple(measurement string, value float64) {
func (im *influxdbMonitor) CountSimple(measurement string, value float64) {
im.Count(measurement, value, nil, nil)
}
Loading