diff --git a/cmd/collectors.go b/cmd/collectors.go index fc67c89c088..dc954884dce 100644 --- a/cmd/collectors.go +++ b/cmd/collectors.go @@ -42,7 +42,6 @@ import ( jsonc "github.com/loadimpact/k6/stats/json" "github.com/loadimpact/k6/stats/kafka" "github.com/loadimpact/k6/stats/statsd" - "github.com/loadimpact/k6/stats/statsd/common" ) const ( @@ -112,7 +111,7 @@ func getCollector( return kafka.New(logger, config) case collectorStatsD: - config := common.NewConfig().Apply(conf.Collectors.StatsD) + config := statsd.NewConfig().Apply(conf.Collectors.StatsD) if err := envconfig.Process("k6_statsd", &config); err != nil { return nil, err } diff --git a/cmd/config.go b/cmd/config.go index 8d635548b42..54c7cddbd22 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -41,7 +41,7 @@ import ( "github.com/loadimpact/k6/stats/datadog" "github.com/loadimpact/k6/stats/influxdb" "github.com/loadimpact/k6/stats/kafka" - "github.com/loadimpact/k6/stats/statsd/common" + "github.com/loadimpact/k6/stats/statsd" "github.com/loadimpact/k6/ui" ) @@ -76,7 +76,7 @@ type Config struct { InfluxDB influxdb.Config `json:"influxdb"` Kafka kafka.Config `json:"kafka"` Cloud cloud.Config `json:"cloud"` - StatsD common.Config `json:"statsd"` + StatsD statsd.Config `json:"statsd"` Datadog datadog.Config `json:"datadog"` CSV csv.Config `json:"csv"` } `json:"collectors"` @@ -217,7 +217,7 @@ func getConsolidatedConfig(fs afero.Fs, cliConf Config, runner lib.Runner) (conf cliConf.Collectors.InfluxDB = influxdb.NewConfig().Apply(cliConf.Collectors.InfluxDB) cliConf.Collectors.Cloud = cloud.NewConfig().Apply(cliConf.Collectors.Cloud) cliConf.Collectors.Kafka = kafka.NewConfig().Apply(cliConf.Collectors.Kafka) - cliConf.Collectors.StatsD = common.NewConfig().Apply(cliConf.Collectors.StatsD) + cliConf.Collectors.StatsD = statsd.NewConfig().Apply(cliConf.Collectors.StatsD) cliConf.Collectors.Datadog = datadog.NewConfig().Apply(cliConf.Collectors.Datadog) fileConf, _, err := readDiskConfig(fs) diff --git a/stats/datadog/collector.go b/stats/datadog/collector.go index 98f3df3803c..415adff68ee 100644 --- a/stats/datadog/collector.go +++ b/stats/datadog/collector.go @@ -21,9 +21,14 @@ package datadog import ( + "time" + + "github.com/sirupsen/logrus" + "gopkg.in/guregu/null.v3" + + "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/stats/statsd/common" - "github.com/sirupsen/logrus" ) type tagHandler stats.TagSet @@ -38,17 +43,51 @@ func (t tagHandler) processTags(tags map[string]string) []string { return res } -// Config defines the datadog configuration +// Config defines the Datadog configuration. type Config struct { - common.Config + Addr null.String `json:"addr,omitempty" envconfig:"K6_DATADOG_ADDR"` + BufferSize null.Int `json:"bufferSize,omitempty" envconfig:"K6_DATADOG_BUFFER_SIZE"` + Namespace null.String `json:"namespace,omitempty" envconfig:"K6_DATADOG_NAMESPACE"` + PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"K6_DATADOG_PUSH_INTERVAL"` + TagBlacklist stats.TagSet `json:"tagBlacklist,omitempty" envconfig:"K6_DATADOG_TAG_BLACKLIST"` +} + +// GetAddr returns the address of the DogStatsD service. +func (c Config) GetAddr() null.String { + return c.Addr +} + +// GetBufferSize returns the size of the commands buffer. +func (c Config) GetBufferSize() null.Int { + return c.BufferSize +} - TagBlacklist stats.TagSet `json:"tagBlacklist,omitempty" envconfig:"TAG_BLACKLIST"` +// GetNamespace returns the namespace prepended to all statsd calls. +func (c Config) GetNamespace() null.String { + return c.Namespace } +// GetPushInterval returns the time interval between outgoing data batches. +func (c Config) GetPushInterval() types.NullDuration { + return c.PushInterval +} + +var _ common.Config = &Config{} + // Apply saves config non-zero config values from the passed config in the receiver. func (c Config) Apply(cfg Config) Config { - c.Config = c.Config.Apply(cfg.Config) - + if cfg.Addr.Valid { + c.Addr = cfg.Addr + } + if cfg.BufferSize.Valid { + c.BufferSize = cfg.BufferSize + } + if cfg.Namespace.Valid { + c.Namespace = cfg.Namespace + } + if cfg.PushInterval.Valid { + c.PushInterval = cfg.PushInterval + } if cfg.TagBlacklist != nil { c.TagBlacklist = cfg.TagBlacklist } @@ -59,15 +98,18 @@ func (c Config) Apply(cfg Config) Config { // NewConfig creates a new Config instance with default values for some fields. func NewConfig() Config { return Config{ - Config: common.NewConfig(), + Addr: null.NewString("localhost:8125", false), + BufferSize: null.NewInt(20, false), + Namespace: null.NewString("k6.", false), + PushInterval: types.NewNullDuration(1*time.Second, false), TagBlacklist: stats.TagSet{}, } } -// New creates a new statsd connector client +// New creates a new Datadog connector client func New(logger logrus.FieldLogger, conf Config) (*common.Collector, error) { return &common.Collector{ - Config: conf.Config, + Config: conf, Type: "datadog", ProcessTags: tagHandler(conf.TagBlacklist).processTags, Logger: logger, diff --git a/stats/datadog/collector_test.go b/stats/datadog/collector_test.go index e86d348ec70..28f2b194a74 100644 --- a/stats/datadog/collector_test.go +++ b/stats/datadog/collector_test.go @@ -26,8 +26,9 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v3" - "github.com/loadimpact/k6/lib/testutils" + "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/stats/statsd/common" "github.com/loadimpact/k6/stats/statsd/common/testutil" @@ -36,11 +37,16 @@ import ( func TestCollector(t *testing.T) { tagMap := stats.TagSet{"tag1": true, "tag2": true} handler := tagHandler(tagMap) - testutil.BaseTest(t, func(logger logrus.FieldLogger, config common.Config) (*common.Collector, error) { - return New(testutils.NewLogger(t), NewConfig().Apply(Config{ + testutil.BaseTest(t, func( + logger logrus.FieldLogger, addr, namespace null.String, bufferSize null.Int, + pushInterval types.NullDuration) (*common.Collector, error) { + return New(logger, Config{ + Addr: addr, + Namespace: namespace, + BufferSize: bufferSize, + PushInterval: pushInterval, TagBlacklist: tagMap, - Config: config, - })) + }) }, func(t *testing.T, containers []stats.SampleContainer, expectedOutput, output string) { outputLines := strings.Split(output, "\n") expectedOutputLines := strings.Split(expectedOutput, "\n") diff --git a/stats/statsd/collector.go b/stats/statsd/collector.go index b0e669736ae..b8d5f2a62b7 100644 --- a/stats/statsd/collector.go +++ b/stats/statsd/collector.go @@ -21,10 +21,73 @@ package statsd import ( - "github.com/loadimpact/k6/stats/statsd/common" + "time" + "github.com/sirupsen/logrus" + "gopkg.in/guregu/null.v3" + + "github.com/loadimpact/k6/lib/types" + "github.com/loadimpact/k6/stats/statsd/common" ) +// Config defines the StatsD configuration. +type Config struct { + Addr null.String `json:"addr,omitempty" envconfig:"K6_STATSD_ADDR"` + BufferSize null.Int `json:"bufferSize,omitempty" envconfig:"K6_STATSD_BUFFER_SIZE"` + Namespace null.String `json:"namespace,omitempty" envconfig:"K6_STATSD_NAMESPACE"` + PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"K6_STATSD_PUSH_INTERVAL"` +} + +// GetAddr returns the address of the StatsD service. +func (c Config) GetAddr() null.String { + return c.Addr +} + +// GetBufferSize returns the size of the commands buffer. +func (c Config) GetBufferSize() null.Int { + return c.BufferSize +} + +// GetNamespace returns the namespace prepended to all statsd calls. +func (c Config) GetNamespace() null.String { + return c.Namespace +} + +// GetPushInterval returns the time interval between outgoing data batches. +func (c Config) GetPushInterval() types.NullDuration { + return c.PushInterval +} + +var _ common.Config = &Config{} + +// Apply saves config non-zero config values from the passed config in the receiver. +func (c Config) Apply(cfg Config) Config { + if cfg.Addr.Valid { + c.Addr = cfg.Addr + } + if cfg.BufferSize.Valid { + c.BufferSize = cfg.BufferSize + } + if cfg.Namespace.Valid { + c.Namespace = cfg.Namespace + } + if cfg.PushInterval.Valid { + c.PushInterval = cfg.PushInterval + } + + return c +} + +// NewConfig creates a new Config instance with default values for some fields. +func NewConfig() Config { + return Config{ + Addr: null.NewString("localhost:8125", false), + BufferSize: null.NewInt(20, false), + Namespace: null.NewString("k6.", false), + PushInterval: types.NewNullDuration(1*time.Second, false), + } +} + // New creates a new statsd connector client func New(logger logrus.FieldLogger, conf common.Config) (*common.Collector, error) { return &common.Collector{ diff --git a/stats/statsd/collector_test.go b/stats/statsd/collector_test.go index 10745cc70f0..ad753b02587 100644 --- a/stats/statsd/collector_test.go +++ b/stats/statsd/collector_test.go @@ -23,14 +23,29 @@ package statsd import ( "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v3" + "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/stats/statsd/common" "github.com/loadimpact/k6/stats/statsd/common/testutil" ) +func getCollector( + logger logrus.FieldLogger, addr, namespace null.String, bufferSize null.Int, + pushInterval types.NullDuration) (*common.Collector, error) { + return New(logger, Config{ + Addr: addr, + Namespace: namespace, + BufferSize: bufferSize, + PushInterval: pushInterval, + }) +} + func TestCollector(t *testing.T) { - testutil.BaseTest(t, New, + testutil.BaseTest(t, getCollector, func(t *testing.T, _ []stats.SampleContainer, expectedOutput, output string) { require.Equal(t, expectedOutput, output) }) diff --git a/stats/statsd/common/collector.go b/stats/statsd/common/collector.go index cc456aa3511..6abde5f1a60 100644 --- a/stats/statsd/common/collector.go +++ b/stats/statsd/common/collector.go @@ -28,11 +28,21 @@ import ( "github.com/DataDog/datadog-go/statsd" "github.com/sirupsen/logrus" + "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" ) +// Config is the common configuration interface for StatsD/Datadog. +type Config interface { + GetAddr() null.String + GetBufferSize() null.Int + GetNamespace() null.String + GetPushInterval() types.NullDuration +} + var _ lib.Collector = &Collector{} // Collector sends result data to statsd daemons with the ability to send to datadog as well @@ -53,7 +63,7 @@ type Collector struct { // Init sets up the collector func (c *Collector) Init() (err error) { c.Logger = c.Logger.WithField("type", c.Type) - if address := c.Config.Addr.String; address == "" { + if address := c.Config.GetAddr().String; address == "" { err = fmt.Errorf( "connection string is invalid. Received: \"%+s\"", address, @@ -63,14 +73,14 @@ func (c *Collector) Init() (err error) { return err } - c.client, err = statsd.NewBuffered(c.Config.Addr.String, int(c.Config.BufferSize.Int64)) + c.client, err = statsd.NewBuffered(c.Config.GetAddr().String, int(c.Config.GetBufferSize().Int64)) if err != nil { c.Logger.Errorf("Couldn't make buffered client, %s", err) return err } - if namespace := c.Config.Namespace.String; namespace != "" { + if namespace := c.Config.GetNamespace().String; namespace != "" { c.client.Namespace = namespace } @@ -79,13 +89,13 @@ func (c *Collector) Init() (err error) { // Link returns the address of the client func (c *Collector) Link() string { - return c.Config.Addr.String + return c.Config.GetAddr().String } // Run the collector func (c *Collector) Run(ctx context.Context) { c.Logger.Debugf("%s: Running!", c.Type) - ticker := time.NewTicker(time.Duration(c.Config.PushInterval.Duration)) + ticker := time.NewTicker(time.Duration(c.Config.GetPushInterval().Duration)) c.startTime = time.Now() for { diff --git a/stats/statsd/common/collector_test.go b/stats/statsd/common/collector_test.go index 00651623af4..53b0423ccdf 100644 --- a/stats/statsd/common/collector_test.go +++ b/stats/statsd/common/collector_test.go @@ -27,12 +27,35 @@ import ( "gopkg.in/guregu/null.v3" "github.com/loadimpact/k6/lib/testutils" + "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" ) +type config struct { + addr, namespace null.String + bufferSize null.Int + pushInterval types.NullDuration +} + +func (c config) GetAddr() null.String { + return c.addr +} + +func (c config) GetBufferSize() null.Int { + return c.bufferSize +} + +func (c config) GetNamespace() null.String { + return c.namespace +} + +func (c config) GetPushInterval() types.NullDuration { + return c.pushInterval +} + func TestInitWithoutAddressErrors(t *testing.T) { c := &Collector{ - Config: Config{}, + Config: config{}, Type: "testtype", Logger: testutils.NewLogger(t), } @@ -42,8 +65,8 @@ func TestInitWithoutAddressErrors(t *testing.T) { func TestInitWithBogusAddressErrors(t *testing.T) { c := &Collector{ - Config: Config{ - Addr: null.StringFrom("localhost:90000"), + Config: config{ + addr: null.StringFrom("localhost:90000"), }, Type: "testtype", Logger: testutils.NewLogger(t), @@ -55,8 +78,8 @@ func TestInitWithBogusAddressErrors(t *testing.T) { func TestLinkReturnAddress(t *testing.T) { bogusValue := "bogus value" c := &Collector{ - Config: Config{ - Addr: null.StringFrom(bogusValue), + Config: config{ + addr: null.StringFrom(bogusValue), }, } require.Equal(t, bogusValue, c.Link()) diff --git a/stats/statsd/common/config.go b/stats/statsd/common/config.go deleted file mode 100644 index a640f61568b..00000000000 --- a/stats/statsd/common/config.go +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2019 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package common - -import ( - "time" - - "gopkg.in/guregu/null.v3" - - "github.com/loadimpact/k6/lib/types" -) - -// Config defines the statsd configuration -type Config struct { - Addr null.String `json:"addr,omitempty" envconfig:"ADDR"` - BufferSize null.Int `json:"bufferSize,omitempty" envconfig:"BUFFER_SIZE"` - Namespace null.String `json:"namespace,omitempty" envconfig:"NAMESPACE"` - PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"PUSH_INTERVAL"` -} - -// NewConfig creates a new Config instance with default values for some fields. -func NewConfig() Config { - return Config{ - Addr: null.NewString("localhost:8125", false), - BufferSize: null.NewInt(20, false), - Namespace: null.NewString("k6.", false), - PushInterval: types.NewNullDuration(1*time.Second, false), - } -} - -// Apply saves config non-zero config values from the passed config in the receiver. -func (c Config) Apply(cfg Config) Config { - if cfg.Addr.Valid { - c.Addr = cfg.Addr - } - - if cfg.BufferSize.Valid { - c.BufferSize = cfg.BufferSize - } - - if cfg.Namespace.Valid { - c.Namespace = cfg.Namespace - } - - if cfg.PushInterval.Valid { - c.PushInterval = cfg.PushInterval - } - - return c -} diff --git a/stats/statsd/common/testutil/test_helper.go b/stats/statsd/common/testutil/test_helper.go index f79c27d2ff4..2d49fba25ed 100644 --- a/stats/statsd/common/testutil/test_helper.go +++ b/stats/statsd/common/testutil/test_helper.go @@ -36,9 +36,16 @@ import ( "github.com/loadimpact/k6/stats/statsd/common" ) -// BaseTest is a helper function to test statsd/datadog collector throughtly +type getCollectorFn func( + logger logrus.FieldLogger, + addr, namespace null.String, + bufferSize null.Int, + pushInterval types.NullDuration, +) (*common.Collector, error) + +// BaseTest is a helper function to test statsd/datadog collector func BaseTest(t *testing.T, - getCollector func(logrus.FieldLogger, common.Config) (*common.Collector, error), + getCollector getCollectorFn, checkResult func(t *testing.T, samples []stats.SampleContainer, expectedOutput, output string), ) { t.Helper() @@ -66,14 +73,15 @@ func BaseTest(t *testing.T, } } }() - baseConfig := common.NewConfig().Apply(common.Config{ - Addr: null.StringFrom(listener.LocalAddr().String()), - Namespace: null.StringFrom(testNamespace), - BufferSize: null.IntFrom(5), - PushInterval: types.NullDurationFrom(time.Millisecond * 10), - }) - collector, err := getCollector(testutils.NewLogger(t), baseConfig) + pushInterval := types.NullDurationFrom(time.Millisecond * 10) + collector, err := getCollector( + testutils.NewLogger(t), + null.StringFrom(listener.LocalAddr().String()), + null.StringFrom(testNamespace), + null.IntFrom(5), + pushInterval, + ) require.NoError(t, err) require.NoError(t, collector.Init()) ctx, cancel := context.WithCancel(context.Background()) @@ -149,7 +157,7 @@ func BaseTest(t *testing.T, } for _, test := range testMatrix { collector.Collect(test.input) - time.Sleep((time.Duration)(baseConfig.PushInterval.Duration)) + time.Sleep((time.Duration)(pushInterval.Duration)) output := <-ch checkResult(t, test.input, test.output, output) }