Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change in Interval in the s3 plugin from int to float64 #759

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* The signalfx sink no longer deadlocks the flush process if it receives more than one error per submission. Thanks, [antifuchs](https://github.com/antifuchs)!
* Fixed the README to link to the correct HLL implementation. Thanks, [gphat](https://github.com/gphat)!
* Fixed the BucketRegionError while using the S3 Plugin. Thanks, [linuxdynasty](https://github.com/linuxdynasty)!
* Fixed server.go so that it passes `Interval` to the S3 Plugin and localfile plugin. Also added tests in the csv_test.go to test for `+Inf` when passing an Interval of 0. Thanks, [Linuxdynasty(https://github.com/linuxdynasty)!

## Removed

Expand Down
6 changes: 3 additions & 3 deletions plugins/localfile/localfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Plugin struct {
FilePath string
Logger *logrus.Logger
hostname string
interval int
Interval float64
}

// Delimiter defines what kind of delimiter we'll use in the CSV format -- in this case, we want TSV
Expand All @@ -36,11 +36,11 @@ func (p *Plugin) Flush(ctx context.Context, metrics []samplers.InterMetric) erro
if err != nil {
return fmt.Errorf("couldn't open %s for appending: %s", p.FilePath, err)
}
appendToWriter(f, metrics, p.hostname, p.interval)
appendToWriter(f, metrics, p.hostname, p.Interval)
return nil
}

func appendToWriter(appender io.Writer, metrics []samplers.InterMetric, hostname string, interval int) error {
func appendToWriter(appender io.Writer, metrics []samplers.InterMetric, hostname string, interval float64) error {
gzW := gzip.NewWriter(appender)
csvW := csv.NewWriter(gzW)
csvW.Comma = Delimiter
Expand Down
6 changes: 3 additions & 3 deletions plugins/s3/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var tsvSchema = [...]string{
// The caller is responsible for setting w.Comma as the appropriate delimiter.
// For performance, encodeCSV does not flush after every call; the caller is
// expected to flush at the end of the operation cycle
func EncodeInterMetricCSV(d samplers.InterMetric, w *csv.Writer, partitionDate *time.Time, hostName string, interval int) error {
func EncodeInterMetricCSV(d samplers.InterMetric, w *csv.Writer, partitionDate *time.Time, hostName string, interval float64) error {
// TODO(aditya) some better error handling for this
// to guarantee that the result is proper JSON
tags := "{" + strings.Join(d.Tags, ",") + "}"
Expand All @@ -62,7 +62,7 @@ func EncodeInterMetricCSV(d samplers.InterMetric, w *csv.Writer, partitionDate *
metricValue := d.Value
switch d.Type {
case samplers.CounterMetric:
metricValue = d.Value / float64(interval)
metricValue = d.Value / interval
metricType = "rate"

case samplers.GaugeMetric:
Expand All @@ -77,7 +77,7 @@ func EncodeInterMetricCSV(d samplers.InterMetric, w *csv.Writer, partitionDate *
TsvName: d.Name,
TsvTags: tags,
TsvMetricType: metricType,
TsvInterval: strconv.Itoa(interval),
TsvInterval: strconv.FormatFloat(interval, 'f', -1, 64),
TsvVeneurHostname: hostName,
TsvValue: strconv.FormatFloat(metricValue, 'f', -1, 64),

Expand Down
45 changes: 45 additions & 0 deletions plugins/s3/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ func CSVTestCases() []CSVTestCase {
}
}

func ZeroIntervalCSVTestCases() []CSVTestCase {

partition := time.Now().UTC().Format("20060102")

return []CSVTestCase{
{
// Test that when passed an interval of 0, it will return +Inf
Name: "Interval of 0",
InterMetric: samplers.InterMetric{
Name: "a.b.c.max",
Timestamp: 1476119058,
Value: float64(100),
Tags: []string{"foo:bar",
"baz:quz"},
Type: samplers.CounterMetric,
},
Row: strings.NewReader(fmt.Sprintf("a.b.c.max\t{foo:bar,baz:quz}\trate\ttestbox-c3eac9\t0\t2016-10-10 05:04:18\t+Inf\t%s\n", partition)),
},
}
}

func TestEncodeCSV(t *testing.T) {
testCases := CSVTestCases()

Expand All @@ -92,6 +113,30 @@ func TestEncodeCSV(t *testing.T) {
}
}

func TestEncodeCSVZeroInterval(t *testing.T) {
testCases := ZeroIntervalCSVTestCases()

for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {

b := &bytes.Buffer{}

w := csv.NewWriter(b)
w.Comma = '\t'

tm := time.Now()
err := EncodeInterMetricCSV(tc.InterMetric, w, &tm, "testbox-c3eac9", 0)
assert.NoError(t, err)

// We need to flush or there won't actually be any data there
w.Flush()
assert.NoError(t, err)

assertReadersEqual(t, tc.Row, b)
})
}
}

// Helper function for determining that two readers are equal
func assertReadersEqual(t *testing.T, expected io.Reader, actual io.Reader) {

Expand Down
4 changes: 2 additions & 2 deletions plugins/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type S3Plugin struct {
Svc s3iface.S3API
S3Bucket string
Hostname string
Interval int
Interval float64
}

func (p *S3Plugin) Flush(ctx context.Context, metrics []samplers.InterMetric) error {
Expand Down Expand Up @@ -96,7 +96,7 @@ func S3Path(hostname string, ft filetype) *string {
// EncodeInterMetricsCSV returns a reader containing the gzipped CSV representation of the
// InterMetric data, one row per InterMetric.
// the AWS sdk requires seekable input, so we return a ReadSeeker here
func EncodeInterMetricsCSV(metrics []samplers.InterMetric, delimiter rune, includeHeaders bool, hostname string, interval int) (io.ReadSeeker, error) {
func EncodeInterMetricsCSV(metrics []samplers.InterMetric, delimiter rune, includeHeaders bool, hostname string, interval float64) (io.ReadSeeker, error) {
b := &bytes.Buffer{}
gzw := gzip.NewWriter(b)
w := csv.NewWriter(gzw)
Expand Down
42 changes: 42 additions & 0 deletions plugins/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,45 @@ func TestEncodeDDMetricsCSV(t *testing.T) {
})
}
}

func TestEncodeDDMetricsCSVZeroInterval(t *testing.T) {
const ExpectedHeader = "Name\tTags\tMetricType\tVeneurHostname\tInterval\tTimestamp\tValue\tPartition"
const Delimiter = '\t'
const VeneurHostname = "testbox-c3eac9"

testCases := ZeroIntervalCSVTestCases()

metrics := make([]samplers.InterMetric, len(testCases))
for i, tc := range testCases {
metrics[i] = tc.InterMetric
}

c, err := EncodeInterMetricsCSV(metrics, Delimiter, true, VeneurHostname, 0)
assert.NoError(t, err)
gzr, err := gzip.NewReader(c)
assert.NoError(t, err)
r := csv.NewReader(gzr)
r.FieldsPerRecord = 8
r.Comma = Delimiter

// first line should always contain header information
header, err := r.Read()
assert.NoError(t, err)
assert.Equal(t, ExpectedHeader, strings.Join(header, "\t"))

records, err := r.ReadAll()
assert.NoError(t, err)

assert.Equal(t, len(metrics), len(records), "Expected %d records and got %d", len(metrics), len(records))
for i, tc := range testCases {
record := records[i]
t.Run(tc.Name, func(t *testing.T) {
for j, cell := range record {
if strings.ContainsRune(cell, Delimiter) {
record[j] = `"` + cell + `"`
}
}
AssertReadersEqual(t, testCases[i].Row, strings.NewReader(strings.Join(record, "\t")+"\n"))
})
}
}
2 changes: 2 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) {
Svc: svc,
S3Bucket: conf.AwsS3Bucket,
Hostname: ret.Hostname,
Interval: ret.interval.Seconds(),
}
ret.registerPlugin(plugin)
}
Expand All @@ -721,6 +722,7 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) {
localFilePlugin := &localfilep.Plugin{
FilePath: conf.FlushFile,
Logger: log,
Interval: ret.interval.Seconds(),
}
ret.registerPlugin(localFilePlugin)
logger.Info(fmt.Sprintf("Local file logging to %s", conf.FlushFile))
Expand Down