Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

influxdb: Concurrent PeriodicFlusher #2190

Merged
merged 1 commit into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions output/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,20 @@ type Config struct {
// NewConfig creates a new InfluxDB output config with some default values.
func NewConfig() Config {
c := Config{
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
ConcurrentWrites: null.NewInt(10, false),
PushInterval: types.NewNullDuration(time.Second, false),
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
PushInterval: types.NewNullDuration(time.Second, false),

// The minimum value of pow(2, N) for handling a stressful situation
// with the default push interval set to 1s.
// Concurrency is not expected for the normal use-case,
// the response time should be lower than the push interval set value.
// In case of spikes, the response time could go around 2s,
// higher values will highlight a not sustainable situation
// and the user should adjust the executed script
// or the configuration based on the environment and rate expected.
ConcurrentWrites: null.NewInt(4, false),
inancgumus marked this conversation as resolved.
Show resolved Hide resolved
}
return c
}
Expand Down
69 changes: 43 additions & 26 deletions output/influxdb/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"

client "github.com/influxdata/influxdb1-client/v2"
Expand Down Expand Up @@ -51,16 +52,16 @@ const (
type Output struct {
output.SampleBuffer

params output.Params
periodicFlusher *output.PeriodicFlusher

Client client.Client
Config Config
BatchConf client.BatchPointsConfig

logger logrus.FieldLogger
semaphoreCh chan struct{}
fieldKinds map[string]FieldKind
logger logrus.FieldLogger
params output.Params
fieldKinds map[string]FieldKind
periodicFlusher *output.PeriodicFlusher
semaphoreCh chan struct{}
wg sync.WaitGroup
}

// New returns new influxdb output
Expand Down Expand Up @@ -90,8 +91,9 @@ func newOutput(params output.Params) (*Output, error) {
Client: cl,
Config: conf,
BatchConf: batchConf,
semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64),
fieldKinds: fldKinds,
semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64),
wg: sync.WaitGroup{},
}, err
}

Expand Down Expand Up @@ -178,12 +180,12 @@ func (o *Output) Start() error {
// usually means we're either a non-admin user to an existing DB or connecting over UDP.
_, err := o.Client.Query(client.NewQuery("CREATE DATABASE "+o.BatchConf.Database, "", ""))
if err != nil {
o.logger.WithError(err).Debug("InfluxDB: Couldn't create database; most likely harmless")
o.logger.WithError(err).Debug("Couldn't create database; most likely harmless")
}

pf, err := output.NewPeriodicFlusher(time.Duration(o.Config.PushInterval.Duration), o.flushMetrics)
if err != nil {
return err //nolint:wrapcheck
return err
}
o.logger.Debug("Started!")
o.periodicFlusher = pf
Expand All @@ -196,30 +198,45 @@ func (o *Output) Stop() error {
o.logger.Debug("Stopping...")
defer o.logger.Debug("Stopped!")
o.periodicFlusher.Stop()
o.wg.Wait()
return nil
}

func (o *Output) flushMetrics() {
samples := o.GetBufferedSamples()
if len(samples) < 1 {
return
}

o.semaphoreCh <- struct{}{}
defer func() {
<-o.semaphoreCh
}()
o.logger.Debug("Committing...")
o.logger.WithField("samples", len(samples)).Debug("Writing...")
o.wg.Add(1)
o.semaphoreCh <- struct{}{}
go func() {
defer func() {
<-o.semaphoreCh
o.wg.Done()
}()

o.logger.WithField("samples", len(samples)).Debug("Writing...")

batch, err := o.batchFromSamples(samples)
if err != nil {
o.logger.WithError(err).Error("Couldn't create batch from samples")
return
}

batch, err := o.batchFromSamples(samples)
if err != nil {
o.logger.WithError(err).Error("Couldn't create batch from samples")
return
}
o.logger.WithField("points", len(batch.Points())).Debug("Writing...")
startTime := time.Now()
if err := o.Client.Write(batch); err != nil {
o.logger.WithError(err).Error("Couldn't write stats")
return
}
t := time.Since(startTime)
o.logger.WithField("t", t).Debug("Batch written!")

o.logger.WithField("points", len(batch.Points())).Debug("Writing...")
startTime := time.Now()
if err := o.Client.Write(batch); err != nil {
o.logger.WithError(err).Error("Couldn't write stats")
}
t := time.Since(startTime)
o.logger.WithField("t", t).Debug("Batch written!")
if t > time.Duration(o.Config.PushInterval.Duration) {
o.logger.WithField("t", t).
Warn("The flush operation took higher than the expected set push interval. If you see this message multiple times then the setup or configuration need to be adjusted to achieve a sustainable rate.") //nolint:lll
}
}()
}
57 changes: 57 additions & 0 deletions output/influxdb/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ import (
"io"
"net"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/lib/testutils"
Expand Down Expand Up @@ -103,6 +107,7 @@ func testOutputCycle(t testing.TB, handler http.HandlerFunc, body func(testing.T

func TestOutput(t *testing.T) {
t.Parallel()

var samplesRead int
defer func() {
require.Equal(t, samplesRead, 20)
Expand Down Expand Up @@ -140,6 +145,58 @@ func TestOutput(t *testing.T) {
})
}

func TestOutputFlushMetricsConcurrency(t *testing.T) {
t.Parallel()

var (
requests = int32(0)
block = make(chan struct{})
)
wg := sync.WaitGroup{}
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// block all the received requests
// so concurrency will be needed
// to not block the flush
atomic.AddInt32(&requests, 1)
wg.Done()
block <- struct{}{}
}))
defer func() {
// unlock the server
for i := 0; i < 4; i++ {
<-block
}
close(block)
ts.Close()
}()

o, err := newOutput(output.Params{
Logger: testutils.NewLogger(t),
ConfigArgument: ts.URL,
})
require.NoError(t, err)

for i := 0; i < 5; i++ {
select {
case o.semaphoreCh <- struct{}{}:
<-o.semaphoreCh
wg.Add(1)
o.AddMetricSamples([]stats.SampleContainer{stats.Samples{
stats.Sample{
Metric: stats.New("gauge", stats.Gauge),
Value: 2.0,
},
}})
o.flushMetrics()
default:
// the 5th request should be rate limited
assert.Equal(t, 5, i+1)
}
}
wg.Wait()
assert.Equal(t, 4, int(atomic.LoadInt32(&requests)))
}

func TestExtractTagsToValues(t *testing.T) {
t.Parallel()
o, err := newOutput(output.Params{
Expand Down