Skip to content

Commit

Permalink
monitoring: add cache points and batch write
Browse files Browse the repository at this point in the history
  • Loading branch information
lifenod committed Dec 28, 2018
1 parent d02f612 commit 3bebb6a
Showing 1 changed file with 71 additions and 29 deletions.
100 changes: 71 additions & 29 deletions monitoring/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"time"

influxdb "github.com/influxdata/influxdb/client/v2"
"github.com/pkg/errors"
"github.com/theplant/appkit/log"

influxdb "github.com/influxdata/influxdb/client/v2"
)

// InfluxMonitorConfig type for configuration of Monitor that sinks to
// InfluxDB
// TODO Add write cache points interval config?
type InfluxMonitorConfig string

type influxMonitorCfg struct {
Expand All @@ -23,7 +24,13 @@ type influxMonitorCfg struct {
Database string
}

var configRegexp = regexp.MustCompile(`^(?P<scheme>https?):\/\/(?:(?P<username>.*?)(?::(?P<password>.*?)|)@)?(?P<host>.+?)\/(?P<database>.+?)$`)
const (
defaultBatchWriteNanosecondInterval = time.Minute
)

var (
configRegexp = regexp.MustCompile(`^(?P<scheme>https?):\/\/(?:(?P<username>.*?)(?::(?P<password>.*?)|)@)?(?P<host>.+?)\/(?P<database>.+?)$`)
)

func parseConfig(config string) (*influxMonitorCfg, error) {
match := configRegexp.FindStringSubmatch(config)
Expand Down Expand Up @@ -90,7 +97,7 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
return nil, errors.Errorf("influxdb monitoring url %v not database", monitorURL)
}

monitor := influxdbMonitor{
monitor := &influxdbMonitor{
database: cfg.Database,
client: client,
logger: logger,
Expand Down Expand Up @@ -121,11 +128,13 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
}
}()

go monitor.batchWriteTicker()

logger.Info().Log(
"msg", fmt.Sprintf("influxdb instrumentation writing to %s/%s on user %v", cfg.Addr, monitor.database, cfg.Username),
)

return &monitor, nil
return monitor, nil
}

// InfluxdbMonitor implements monitor.Monitor interface, it wraps
Expand All @@ -134,66 +143,99 @@ type influxdbMonitor struct {
client influxdb.Client
database string
logger log.Logger

cachePoints []*influxdb.Point
cachePointsMutex sync.Mutex
}

// InsertRecord part of monitor.Monitor.
func (im influxdbMonitor) InsertRecord(measurement string, value interface{}, tags map[string]string, fields map[string]interface{}, at time.Time) {
if fields == nil {
fields = map[string]interface{}{}
func (im *influxdbMonitor) batchWriteTicker() {
t := time.NewTicker(defaultBatchWriteNanosecondInterval)

for {
<-t.C

im.batchWrite()
}
}

fields["value"] = value
func (im *influxdbMonitor) batchWrite() {
if len(im.cachePoints) == 0 {
return
}

bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: im.database,
})
if err != nil {
_ = im.logger.Error().Log(
"database", im.database,
"err", err,
"during", "influxdb.NewBatchPoints",
"msg", fmt.Sprintf("NewBatchPoints failed: %v", err),
)
return
}

l := im.logger.With("database", im.database,
"measurement", measurement,
"value", value,
"tags", tags)
im.cachePointsMutex.Lock()
defer im.cachePointsMutex.Unlock()

bp.AddPoints(im.cachePoints)

err = im.client.Write(bp)
if err != nil {
l.Error().Log(
_ = im.logger.Error().Log(
"database", im.database,
"err", err,
"during", "influxdb.NewBatchPoints",
"msg", fmt.Sprintf("Error initializing batch points for %s: %v", measurement, err),
"during", "influxdb.client.Write",
"msg", fmt.Sprintf("influxdb client write cache points failed: %v", err),
)
return
}

im.cachePoints = nil
}

// InsertRecord part of monitor.Monitor.
func (im *influxdbMonitor) InsertRecord(measurement string, value interface{}, tags map[string]string, fields map[string]interface{}, at time.Time) {
if fields == nil {
fields = map[string]interface{}{}
}

fields["value"] = value

pt, err := influxdb.NewPoint(measurement, tags, fields, at)

if err != nil {
l.Error().Log(
_ = im.logger.Error().Log(
"database", im.database,
"measurement", measurement,
"value", value,
"tags", tags,
"err", err,
"during", "influxdb.NewPoint",
"msg", fmt.Sprintf("Error initializing a point for %s: %v", measurement, err),
)
return
}

bp.AddPoint(pt)
im.cachePointsMutex.Lock()
defer im.cachePointsMutex.Unlock()

if err := im.client.Write(bp); err != nil {
im.logger.Error().Log(
"err", err,
"during", "influxdb.Client.Write",
"msg", fmt.Sprintf("Error inserting record into %s: %v", measurement, err),
)
}
im.cachePoints = append(im.cachePoints, pt)
}

func (im influxdbMonitor) Count(measurement string, value float64, tags map[string]string, fields map[string]interface{}) {
func (im *influxdbMonitor) Count(measurement string, value float64, tags map[string]string, fields map[string]interface{}) {
im.InsertRecord(measurement, value, tags, fields, time.Now())
}

// CountError logs a value in measurement, with the given error's
// message stored in an `error` tag.
func (im influxdbMonitor) CountError(measurement string, value float64, err error) {
func (im *influxdbMonitor) CountError(measurement string, value float64, err error) {
data := map[string]string{"error": err.Error()}
im.Count(measurement, value, data, nil)
}

// CountSimple logs a value in measurement (with no tags).
func (im influxdbMonitor) CountSimple(measurement string, value float64) {
func (im *influxdbMonitor) CountSimple(measurement string, value float64) {
im.Count(measurement, value, nil, nil)
}

0 comments on commit 3bebb6a

Please sign in to comment.