Skip to content

Commit

Permalink
output/influxdb: use the async periodic flusher
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Oct 19, 2021
1 parent 5127049 commit 4615d9b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
2 changes: 1 addition & 1 deletion output/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewConfig() Config {
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
ConcurrentWrites: null.NewInt(10, false),
ConcurrentWrites: null.NewInt(4, false),
PushInterval: types.NewNullDuration(time.Second, false),
}
return c
Expand Down
32 changes: 17 additions & 15 deletions output/influxdb/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ type Output struct {
Config Config
BatchConf client.BatchPointsConfig

logger logrus.FieldLogger
semaphoreCh chan struct{}
fieldKinds map[string]FieldKind
logger logrus.FieldLogger
fieldKinds map[string]FieldKind
}

// New returns new influxdb output
Expand All @@ -87,11 +86,10 @@ func newOutput(params output.Params) (*Output, error) {
logger: params.Logger.WithFields(logrus.Fields{
"output": "InfluxDBv1",
}),
Client: cl,
Config: conf,
BatchConf: batchConf,
semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64),
fieldKinds: fldKinds,
Client: cl,
Config: conf,
BatchConf: batchConf,
fieldKinds: fldKinds,
}, err
}

Expand Down Expand Up @@ -178,12 +176,15 @@ func (o *Output) Start() error {
// usually means we're either a non-admin user to an existing DB or connecting over UDP.
_, err := o.Client.Query(client.NewQuery("CREATE DATABASE "+o.BatchConf.Database, "", ""))
if err != nil {
o.logger.WithError(err).Debug("InfluxDB: Couldn't create database; most likely harmless")
o.logger.WithError(err).Debug("Couldn't create database; most likely harmless")
}

pf, err := output.NewPeriodicFlusher(time.Duration(o.Config.PushInterval.Duration), o.flushMetrics)
pf, err := output.NewAsyncPeriodicFlusher(
time.Duration(o.Config.PushInterval.Duration),
int(o.Config.ConcurrentWrites.Int64),
o.flushMetrics)
if err != nil {
return err //nolint:wrapcheck
return err
}
o.logger.Debug("Started!")
o.periodicFlusher = pf
Expand All @@ -201,11 +202,11 @@ func (o *Output) Stop() error {

func (o *Output) flushMetrics() {
samples := o.GetBufferedSamples()
if len(samples) < 1 {
o.logger.Debug("Any buffered samples, skipping the flush operation")
return
}

o.semaphoreCh <- struct{}{}
defer func() {
<-o.semaphoreCh
}()
o.logger.Debug("Committing...")
o.logger.WithField("samples", len(samples)).Debug("Writing...")

Expand All @@ -219,6 +220,7 @@ func (o *Output) flushMetrics() {
startTime := time.Now()
if err := o.Client.Write(batch); err != nil {
o.logger.WithError(err).Error("Couldn't write stats")
return
}
t := time.Since(startTime)
o.logger.WithField("t", t).Debug("Batch written!")
Expand Down

0 comments on commit 4615d9b

Please sign in to comment.