Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add output option for csv format #1067

Merged
merged 19 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/consts"
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/csv"
"github.com/loadimpact/k6/stats/datadog"
"github.com/loadimpact/k6/stats/influxdb"
jsonc "github.com/loadimpact/k6/stats/json"
Expand All @@ -47,6 +48,7 @@ const (
collectorCloud = "cloud"
collectorStatsD = "statsd"
collectorDatadog = "datadog"
collectorCSV = "csv"
)

func parseCollector(s string) (t, arg string) {
Expand Down Expand Up @@ -111,6 +113,19 @@ func newCollector(collectorName, arg string, src *lib.SourceData, conf Config) (
return nil, err
}
return datadog.New(config)
case collectorCSV:
config := csv.NewConfig().Apply(conf.Collectors.CSV)
if err := envconfig.Process("k6", &config); err != nil {
return nil, err
}
if arg != "" {
cmdConfig, err := csv.ParseArg(arg)
if err != nil {
return nil, err
}
config = config.Apply(cmdConfig)
}
return csv.New(afero.NewOsFs(), conf.SystemTags, config)
default:
return nil, errors.Errorf("unknown output type: %s", collectorName)
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/loadimpact/k6/lib/scheduler"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/csv"
"github.com/loadimpact/k6/stats/datadog"
"github.com/loadimpact/k6/stats/influxdb"
"github.com/loadimpact/k6/stats/kafka"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Config struct {
Cloud cloud.Config `json:"cloud"`
StatsD common.Config `json:"statsd"`
Datadog datadog.Config `json:"datadog"`
CSV csv.Config `json:"csv"`
} `json:"collectors"`
}

Expand All @@ -97,6 +99,7 @@ func (c Config) Apply(cfg Config) Config {
c.Collectors.Kafka = c.Collectors.Kafka.Apply(cfg.Collectors.Kafka)
c.Collectors.StatsD = c.Collectors.StatsD.Apply(cfg.Collectors.StatsD)
c.Collectors.Datadog = c.Collectors.Datadog.Apply(cfg.Collectors.Datadog)
c.Collectors.CSV = c.Collectors.CSV.Apply(cfg.Collectors.CSV)
return c
}

Expand Down
237 changes: 237 additions & 0 deletions stats/csv/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2016 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package csv

import (
"bytes"
"context"
"encoding/csv"
"fmt"
"io"
"os"
"sort"
"sync"
"time"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/stats"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
)

// Collector saving output to csv implements the lib.Collector interface
type Collector struct {
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved
outfile io.WriteCloser
fname string
resTags []string
ignoredTags []string
csvWriter *csv.Writer
csvLock sync.Mutex
buffer []stats.Sample
bufferLock sync.Mutex
row []string
saveInterval time.Duration
}

// Verify that Collector implements lib.Collector
var _ lib.Collector = &Collector{}

// Similar to ioutil.NopCloser, but for writers
type nopCloser struct {
io.Writer
}

func (nopCloser) Close() error { return nil }

// New Creates new instance of CSV collector
func New(fs afero.Fs, tags lib.TagSet, config Config) (*Collector, error) {
resTags := []string{}
ignoredTags := []string{}
for tag, flag := range tags {
if flag {
resTags = append(resTags, tag)
} else {
ignoredTags = append(ignoredTags, tag)
}
}
sort.Strings(resTags)
sort.Strings(ignoredTags)

saveInterval := time.Duration(config.SaveInterval.Duration)
fname := config.FileName.String

if fname == "" || fname == "-" {
logfile := nopCloser{os.Stdout}
return &Collector{
outfile: logfile,
fname: "-",
resTags: resTags,
ignoredTags: ignoredTags,
csvWriter: csv.NewWriter(logfile),
row: make([]string, 3+len(resTags)+1, 3+len(resTags)+1),
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved
saveInterval: saveInterval,
}, nil
}

logfile, err := fs.Create(fname)
if err != nil {
return nil, err
}

return &Collector{
outfile: logfile,
fname: fname,
resTags: resTags,
ignoredTags: ignoredTags,
csvWriter: csv.NewWriter(logfile),
row: make([]string, 3+len(resTags)+1, 3+len(resTags)+1),
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved
saveInterval: saveInterval,
}, nil
}

// Init writes column names to csv file
func (c *Collector) Init() error {
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved
header := MakeHeader(c.resTags)
err := c.csvWriter.Write(header)
if err != nil {
logrus.WithField("filename", c.fname).Error("CSV: Error writing column names to file")
}
c.csvWriter.Flush()
return nil
}

// SetRunStatus does nothing
func (c *Collector) SetRunStatus(status lib.RunStatus) {}
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved

// Run just blocks until the context is done
func (c *Collector) Run(ctx context.Context) {
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved
ticker := time.NewTicker(c.saveInterval)
for {
select {
case <-ticker.C:
c.WriteToFile()
case <-ctx.Done():
c.WriteToFile()
err := c.outfile.Close()
if err != nil {
logrus.WithField("filename", c.fname).Error("CSV: Error closing the file")
}
return
}
}
}

// Collect Saves samples to buffer
func (c *Collector) Collect(scs []stats.SampleContainer) {
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
c.bufferLock.Lock()
defer c.bufferLock.Unlock()
for _, sc := range scs {
c.buffer = append(c.buffer, sc.GetSamples()...)
}
}

// WriteToFile Writes samples to the csv file
func (c *Collector) WriteToFile() {
c.bufferLock.Lock()
samples := c.buffer
c.buffer = nil
c.bufferLock.Unlock()

if len(samples) > 0 {
c.csvLock.Lock()
defer c.csvLock.Unlock()
for _, sc := range samples {
for _, sample := range sc.GetSamples() {
sample := sample
row := SampleToRow(&sample, c.resTags, c.ignoredTags, c.row)
err := c.csvWriter.Write(row)
if err != nil {
logrus.WithField("filename", c.fname).Error("CSV: Error writing to file")
}
}
}
c.csvWriter.Flush()
}
}

// Link returns a dummy string, it's only included to satisfy the lib.Collector interface
func (c *Collector) Link() string {
return c.fname
}

// MakeHeader creates list of column names for csv file
func MakeHeader(tags []string) []string {
Sirozha1337 marked this conversation as resolved.
Show resolved Hide resolved
tags = append(tags, "extra_tags")
return append([]string{"metric_name", "timestamp", "metric_value"}, tags...)
}

// SampleToRow converts sample into array of strings
func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string, row []string) []string {
row[0] = sample.Metric.Name
row[1] = fmt.Sprintf("%d", sample.Time.Unix())
row[2] = fmt.Sprintf("%f", sample.Value)
sampleTags := sample.Tags.CloneTags()
na-- marked this conversation as resolved.
Show resolved Hide resolved

for ind, tag := range resTags {
row[ind+3] = sampleTags[tag]
}

extraTags := bytes.Buffer{}
prev := false
for tag, val := range sampleTags {
if !IsStringInSlice(resTags, tag) && !IsStringInSlice(ignoredTags, tag) {
if prev {
if _, err := extraTags.WriteString("&"); err != nil {
break
}
}

if _, err := extraTags.WriteString(tag); err != nil {
break
}

if _, err := extraTags.WriteString("="); err != nil {
break
}

if _, err := extraTags.WriteString(val); err != nil {
break
}
prev = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think extra is needed at all :). You can probably try to use sort.SearchStrings on both the resTags and ignoredTags if you sort them as well. Not certain whether this will have beneficial results but it could be benchmarked

}
row[len(row)-1] = extraTags.String()

return row
}

// IsStringInSlice returns whether the string is contained within a string slice
func IsStringInSlice(slice []string, str string) bool {
if index := sort.SearchStrings(slice, str); index == len(slice) || slice[index] != str {
return false
}
return true
}

// GetRequiredSystemTags returns which sample tags are needed by this collector
func (c *Collector) GetRequiredSystemTags() lib.TagSet {
return lib.TagSet{} // There are no required tags for this collector
}
Loading