Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split StatsD and Datadog config #1655

Merged
merged 1 commit into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
jsonc "github.com/loadimpact/k6/stats/json"
"github.com/loadimpact/k6/stats/kafka"
"github.com/loadimpact/k6/stats/statsd"
"github.com/loadimpact/k6/stats/statsd/common"
)

const (
Expand Down Expand Up @@ -112,7 +111,7 @@ func getCollector(

return kafka.New(logger, config)
case collectorStatsD:
config := common.NewConfig().Apply(conf.Collectors.StatsD)
config := statsd.NewConfig().Apply(conf.Collectors.StatsD)
if err := envconfig.Process("k6_statsd", &config); err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/loadimpact/k6/stats/datadog"
"github.com/loadimpact/k6/stats/influxdb"
"github.com/loadimpact/k6/stats/kafka"
"github.com/loadimpact/k6/stats/statsd/common"
"github.com/loadimpact/k6/stats/statsd"
"github.com/loadimpact/k6/ui"
)

Expand Down Expand Up @@ -76,7 +76,7 @@ type Config struct {
InfluxDB influxdb.Config `json:"influxdb"`
Kafka kafka.Config `json:"kafka"`
Cloud cloud.Config `json:"cloud"`
StatsD common.Config `json:"statsd"`
StatsD statsd.Config `json:"statsd"`
Datadog datadog.Config `json:"datadog"`
CSV csv.Config `json:"csv"`
} `json:"collectors"`
Expand Down Expand Up @@ -217,7 +217,7 @@ func getConsolidatedConfig(fs afero.Fs, cliConf Config, runner lib.Runner) (conf
cliConf.Collectors.InfluxDB = influxdb.NewConfig().Apply(cliConf.Collectors.InfluxDB)
cliConf.Collectors.Cloud = cloud.NewConfig().Apply(cliConf.Collectors.Cloud)
cliConf.Collectors.Kafka = kafka.NewConfig().Apply(cliConf.Collectors.Kafka)
cliConf.Collectors.StatsD = common.NewConfig().Apply(cliConf.Collectors.StatsD)
cliConf.Collectors.StatsD = statsd.NewConfig().Apply(cliConf.Collectors.StatsD)
cliConf.Collectors.Datadog = datadog.NewConfig().Apply(cliConf.Collectors.Datadog)

fileConf, _, err := readDiskConfig(fs)
Expand Down
60 changes: 51 additions & 9 deletions stats/datadog/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
package datadog

import (
"time"

"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/statsd/common"
"github.com/sirupsen/logrus"
)

type tagHandler stats.TagSet
Expand All @@ -38,17 +43,51 @@ func (t tagHandler) processTags(tags map[string]string) []string {
return res
}

// Config defines the datadog configuration
// Config defines the Datadog configuration.
type Config struct {
common.Config
Addr null.String `json:"addr,omitempty" envconfig:"K6_DATADOG_ADDR"`
BufferSize null.Int `json:"bufferSize,omitempty" envconfig:"K6_DATADOG_BUFFER_SIZE"`
Namespace null.String `json:"namespace,omitempty" envconfig:"K6_DATADOG_NAMESPACE"`
PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"K6_DATADOG_PUSH_INTERVAL"`
TagBlacklist stats.TagSet `json:"tagBlacklist,omitempty" envconfig:"K6_DATADOG_TAG_BLACKLIST"`
}

// GetAddr returns the address of the DogStatsD service.
func (c Config) GetAddr() null.String {
return c.Addr
}

// GetBufferSize returns the size of the commands buffer.
func (c Config) GetBufferSize() null.Int {
return c.BufferSize
}

TagBlacklist stats.TagSet `json:"tagBlacklist,omitempty" envconfig:"TAG_BLACKLIST"`
// GetNamespace returns the namespace prepended to all statsd calls.
func (c Config) GetNamespace() null.String {
return c.Namespace
}

// GetPushInterval returns the time interval between outgoing data batches.
func (c Config) GetPushInterval() types.NullDuration {
return c.PushInterval
}

var _ common.Config = &Config{}

// Apply saves config non-zero config values from the passed config in the receiver.
func (c Config) Apply(cfg Config) Config {
c.Config = c.Config.Apply(cfg.Config)

if cfg.Addr.Valid {
c.Addr = cfg.Addr
}
if cfg.BufferSize.Valid {
c.BufferSize = cfg.BufferSize
}
if cfg.Namespace.Valid {
c.Namespace = cfg.Namespace
}
if cfg.PushInterval.Valid {
c.PushInterval = cfg.PushInterval
}
if cfg.TagBlacklist != nil {
c.TagBlacklist = cfg.TagBlacklist
}
Expand All @@ -59,15 +98,18 @@ func (c Config) Apply(cfg Config) Config {
// NewConfig creates a new Config instance with default values for some fields.
func NewConfig() Config {
return Config{
Config: common.NewConfig(),
Addr: null.NewString("localhost:8125", false),
BufferSize: null.NewInt(20, false),
Namespace: null.NewString("k6.", false),
PushInterval: types.NewNullDuration(1*time.Second, false),
TagBlacklist: stats.TagSet{},
}
}

// New creates a new statsd connector client
// New creates a new Datadog connector client
func New(logger logrus.FieldLogger, conf Config) (*common.Collector, error) {
return &common.Collector{
Config: conf.Config,
Config: conf,
Type: "datadog",
ProcessTags: tagHandler(conf.TagBlacklist).processTags,
Logger: logger,
Expand Down
16 changes: 11 additions & 5 deletions stats/datadog/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/statsd/common"
"github.com/loadimpact/k6/stats/statsd/common/testutil"
Expand All @@ -36,11 +37,16 @@ import (
func TestCollector(t *testing.T) {
tagMap := stats.TagSet{"tag1": true, "tag2": true}
handler := tagHandler(tagMap)
testutil.BaseTest(t, func(logger logrus.FieldLogger, config common.Config) (*common.Collector, error) {
return New(testutils.NewLogger(t), NewConfig().Apply(Config{
testutil.BaseTest(t, func(
logger logrus.FieldLogger, addr, namespace null.String, bufferSize null.Int,
pushInterval types.NullDuration) (*common.Collector, error) {
return New(logger, Config{
Addr: addr,
Namespace: namespace,
BufferSize: bufferSize,
PushInterval: pushInterval,
TagBlacklist: tagMap,
Config: config,
}))
})
}, func(t *testing.T, containers []stats.SampleContainer, expectedOutput, output string) {
outputLines := strings.Split(output, "\n")
expectedOutputLines := strings.Split(expectedOutput, "\n")
Expand Down
65 changes: 64 additions & 1 deletion stats/statsd/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,73 @@
package statsd

import (
"github.com/loadimpact/k6/stats/statsd/common"
"time"

"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats/statsd/common"
)

// Config defines the StatsD configuration.
type Config struct {
Addr null.String `json:"addr,omitempty" envconfig:"K6_STATSD_ADDR"`
BufferSize null.Int `json:"bufferSize,omitempty" envconfig:"K6_STATSD_BUFFER_SIZE"`
Namespace null.String `json:"namespace,omitempty" envconfig:"K6_STATSD_NAMESPACE"`
PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"K6_STATSD_PUSH_INTERVAL"`
}

// GetAddr returns the address of the StatsD service.
func (c Config) GetAddr() null.String {
return c.Addr
}

// GetBufferSize returns the size of the commands buffer.
func (c Config) GetBufferSize() null.Int {
return c.BufferSize
}

// GetNamespace returns the namespace prepended to all statsd calls.
func (c Config) GetNamespace() null.String {
return c.Namespace
}

// GetPushInterval returns the time interval between outgoing data batches.
func (c Config) GetPushInterval() types.NullDuration {
return c.PushInterval
}

var _ common.Config = &Config{}

// Apply saves config non-zero config values from the passed config in the receiver.
func (c Config) Apply(cfg Config) Config {
if cfg.Addr.Valid {
c.Addr = cfg.Addr
}
if cfg.BufferSize.Valid {
c.BufferSize = cfg.BufferSize
}
if cfg.Namespace.Valid {
c.Namespace = cfg.Namespace
}
if cfg.PushInterval.Valid {
c.PushInterval = cfg.PushInterval
}

return c
}

// NewConfig creates a new Config instance with default values for some fields.
func NewConfig() Config {
return Config{
Addr: null.NewString("localhost:8125", false),
BufferSize: null.NewInt(20, false),
Namespace: null.NewString("k6.", false),
PushInterval: types.NewNullDuration(1*time.Second, false),
}
}

// New creates a new statsd connector client
func New(logger logrus.FieldLogger, conf common.Config) (*common.Collector, error) {
return &common.Collector{
Expand Down
17 changes: 16 additions & 1 deletion stats/statsd/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,29 @@ package statsd
import (
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/statsd/common"
"github.com/loadimpact/k6/stats/statsd/common/testutil"
)

func getCollector(
logger logrus.FieldLogger, addr, namespace null.String, bufferSize null.Int,
pushInterval types.NullDuration) (*common.Collector, error) {
return New(logger, Config{
Addr: addr,
Namespace: namespace,
BufferSize: bufferSize,
PushInterval: pushInterval,
})
}

func TestCollector(t *testing.T) {
testutil.BaseTest(t, New,
testutil.BaseTest(t, getCollector,
func(t *testing.T, _ []stats.SampleContainer, expectedOutput, output string) {
require.Equal(t, expectedOutput, output)
})
Expand Down
20 changes: 15 additions & 5 deletions stats/statsd/common/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,21 @@ import (

"github.com/DataDog/datadog-go/statsd"
"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
)

// Config is the common configuration interface for StatsD/Datadog.
type Config interface {
GetAddr() null.String
GetBufferSize() null.Int
GetNamespace() null.String
GetPushInterval() types.NullDuration
}

var _ lib.Collector = &Collector{}

// Collector sends result data to statsd daemons with the ability to send to datadog as well
Expand All @@ -53,7 +63,7 @@ type Collector struct {
// Init sets up the collector
func (c *Collector) Init() (err error) {
c.Logger = c.Logger.WithField("type", c.Type)
if address := c.Config.Addr.String; address == "" {
if address := c.Config.GetAddr().String; address == "" {
err = fmt.Errorf(
"connection string is invalid. Received: \"%+s\"",
address,
Expand All @@ -63,14 +73,14 @@ func (c *Collector) Init() (err error) {
return err
}

c.client, err = statsd.NewBuffered(c.Config.Addr.String, int(c.Config.BufferSize.Int64))
c.client, err = statsd.NewBuffered(c.Config.GetAddr().String, int(c.Config.GetBufferSize().Int64))

if err != nil {
c.Logger.Errorf("Couldn't make buffered client, %s", err)
return err
}

if namespace := c.Config.Namespace.String; namespace != "" {
if namespace := c.Config.GetNamespace().String; namespace != "" {
c.client.Namespace = namespace
}

Expand All @@ -79,13 +89,13 @@ func (c *Collector) Init() (err error) {

// Link returns the address of the client
func (c *Collector) Link() string {
return c.Config.Addr.String
return c.Config.GetAddr().String
}

// Run the collector
func (c *Collector) Run(ctx context.Context) {
c.Logger.Debugf("%s: Running!", c.Type)
ticker := time.NewTicker(time.Duration(c.Config.PushInterval.Duration))
ticker := time.NewTicker(time.Duration(c.Config.GetPushInterval().Duration))
c.startTime = time.Now()

for {
Expand Down
33 changes: 28 additions & 5 deletions stats/statsd/common/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,35 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
)

type config struct {
addr, namespace null.String
bufferSize null.Int
pushInterval types.NullDuration
}

func (c config) GetAddr() null.String {
return c.addr
}

func (c config) GetBufferSize() null.Int {
return c.bufferSize
}

func (c config) GetNamespace() null.String {
return c.namespace
}

func (c config) GetPushInterval() types.NullDuration {
return c.pushInterval
}

func TestInitWithoutAddressErrors(t *testing.T) {
c := &Collector{
Config: Config{},
Config: config{},
Type: "testtype",
Logger: testutils.NewLogger(t),
}
Expand All @@ -42,8 +65,8 @@ func TestInitWithoutAddressErrors(t *testing.T) {

func TestInitWithBogusAddressErrors(t *testing.T) {
c := &Collector{
Config: Config{
Addr: null.StringFrom("localhost:90000"),
Config: config{
addr: null.StringFrom("localhost:90000"),
},
Type: "testtype",
Logger: testutils.NewLogger(t),
Expand All @@ -55,8 +78,8 @@ func TestInitWithBogusAddressErrors(t *testing.T) {
func TestLinkReturnAddress(t *testing.T) {
bogusValue := "bogus value"
c := &Collector{
Config: Config{
Addr: null.StringFrom(bogusValue),
Config: config{
addr: null.StringFrom(bogusValue),
},
}
require.Equal(t, bogusValue, c.Link())
Expand Down
Loading