Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

MemoryReporter: make call to runtime.ReadMemStats time bound to avoid lost metrics #1494

Merged
merged 10 commits into from
Oct 16, 2019
17 changes: 13 additions & 4 deletions stats/memory_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,26 @@ import (
"runtime"
"strconv"
"time"

"github.com/grafana/metrictank/util"
)

// MemoryReporter sources memory stats from the runtime and reports them
// It also reports gcPercent based on the GOGC environment variable
type MemoryReporter struct {
mem runtime.MemStats
gcCyclesTotal uint32
mem runtime.MemStats
gcCyclesTotal uint32
timeBoundGetMemStats func() interface{}
}

func NewMemoryReporter() *MemoryReporter {
return registry.getOrAdd("memory", &MemoryReporter{}).(*MemoryReporter)
reporter := registry.getOrAdd("memory", &MemoryReporter{}).(*MemoryReporter)
reporter.timeBoundGetMemStats = util.TimeBoundWithCacheFunc(func() interface{} {
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
return mem
}, 5*time.Second, 1*time.Minute)
Copy link
Contributor

@Dieterbe Dieterbe Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that all graphite/metrictank systems support down to 1 second resolution.
that's also what metrictank is configured to emit by default.
as such a 5s timeout seems too long for such a setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more discussion we think it will be best to model the timeout after 65% of the set interval. However, we don't know if this is enough time for the rest of the reporters to complete their operations. What are your thoughts on the best way to proceed with fixing this issue? Also of note, once we update it here we will need to update it in a few other projects.

Another option is to launch them all at the same time and wait for results. Any reporter that doesn't return a result within the interval will not get reported for that tick, but all the others will.
We can also add a budget to the launched reporting function and it can decide if it would like to use caching or not. We feel this is the better option, but it does add a bit of overhead which takes away from the overall time allocated for that tick. It still seems best. Thoughts?

return reporter
}

func getGcPercent() int {
Expand All @@ -37,7 +46,7 @@ func getGcPercent() int {
}

func (m *MemoryReporter) ReportGraphite(prefix, buf []byte, now time.Time) []byte {
runtime.ReadMemStats(&m.mem)
m.mem = m.timeBoundGetMemStats().(runtime.MemStats)
gcPercent := getGcPercent()

// metric memory.total_bytes_allocated is a counter of total number of bytes allocated during process lifetime
Expand Down
47 changes: 47 additions & 0 deletions util/timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package util

import (
"time"
)

// TimeBoundWithCacheFunc decorates a function that has a return value in order to bound its execution time.
// When the decorated function is called, if the original function takes more than 'timeout' to execute,
// the returned value will be the value returned by a previous call. If no previous call was performed,
// the call will block until the original function returns.
func TimeBoundWithCacheFunc(fn func() interface{}, timeout, maxAge time.Duration) func() interface{} {
var previousResult interface{}
var previousTimestamp time.Time

return func() interface{} {
done := make(chan interface{}, 1)
timer := time.NewTimer(timeout)

go func() {
result := fn()
done <- result
close(done)
}()

var result interface{}

select {
case result = <-done:
// call succeeded in time, use its result
// avoid timers from staying alive
// see https://medium.com/@oboturov/golang-time-after-is-not-garbage-collected-4cbc94740082
timer.Stop()
case <-timer.C:
// call took too long, use cached result if not too old
if time.Since(previousTimestamp) < maxAge && previousResult != nil {
return previousResult
} else {
// in case the result did not arrive in time but the previous result was too old
result = <-done
}
}

previousTimestamp = time.Now()
previousResult = result
return result
}
}
192 changes: 192 additions & 0 deletions util/timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package util

import (
"reflect"
"runtime"
"testing"
"time"
)

func TestTimeBoundWithCacheFunc(t *testing.T) {
testMachineLatency := 50 * time.Millisecond

zeroDuration := 0 * time.Millisecond
veryShortDuration := 1 * time.Millisecond
shortDuration := veryShortDuration + testMachineLatency*2
longDuration := shortDuration + testMachineLatency*2
veryLongDuration := longDuration + testMachineLatency*2

tests := []struct {
name string
functionReturns []int
executionDurations []time.Duration
expectedResults []int
timeout time.Duration
maxAge time.Duration
}{
{
name: "immediate",
functionReturns: []int{42},
executionDurations: []time.Duration{zeroDuration},
expectedResults: []int{42},
timeout: shortDuration,
maxAge: veryLongDuration,
},
{
name: "immediate_zero_timeout",
functionReturns: []int{42},
executionDurations: []time.Duration{zeroDuration},
expectedResults: []int{42},
timeout: zeroDuration,
maxAge: veryLongDuration,
},
{
name: "immediate_zero_maxage",
functionReturns: []int{42},
executionDurations: []time.Duration{zeroDuration},
expectedResults: []int{42},
timeout: shortDuration,
maxAge: zeroDuration,
},
{
name: "slow_notimeout",
functionReturns: []int{42},
executionDurations: []time.Duration{shortDuration},
expectedResults: []int{42},
timeout: longDuration,
maxAge: veryLongDuration,
},
{
name: "slow_timeout",
functionReturns: []int{42},
executionDurations: []time.Duration{longDuration},
expectedResults: []int{42},
timeout: shortDuration,
maxAge: veryLongDuration,
},
{
name: "fast_then_slow_timeout",
functionReturns: []int{42, 88},
executionDurations: []time.Duration{veryShortDuration, longDuration},
expectedResults: []int{42, 42},
timeout: shortDuration,
maxAge: veryLongDuration,
},
{
name: "fast_then_fast",
functionReturns: []int{42, 88},
executionDurations: []time.Duration{veryShortDuration, veryShortDuration},
expectedResults: []int{42, 88},
timeout: shortDuration,
maxAge: veryLongDuration,
},
{
name: "slow_timeout_then_slow_timeout",
functionReturns: []int{42, 88},
executionDurations: []time.Duration{longDuration, longDuration},
expectedResults: []int{42, 42},
timeout: shortDuration,
maxAge: veryLongDuration,
},
{
name: "fast_then_fast_then_slow_timeout",
functionReturns: []int{42, 88, 1},
executionDurations: []time.Duration{veryShortDuration, veryShortDuration, longDuration},
expectedResults: []int{42, 88, 88},
timeout: shortDuration,
maxAge: veryLongDuration,
},
{
name: "fast_then_slow_then_fast_never_timeout",
functionReturns: []int{42, 88, 1},
executionDurations: []time.Duration{veryShortDuration, longDuration, veryShortDuration},
expectedResults: []int{42, 88, 1},
timeout: veryLongDuration,
maxAge: veryLongDuration,
},
{
name: "fast_then_slow_timeout_then_fast",
functionReturns: []int{42, 88, 1},
executionDurations: []time.Duration{veryShortDuration, longDuration, veryShortDuration},
expectedResults: []int{42, 42, 1},
timeout: shortDuration,
maxAge: veryLongDuration,
},
{
name: "stale_cache_fast_then_slow_timeout",
functionReturns: []int{42, 88},
executionDurations: []time.Duration{veryShortDuration, veryLongDuration},
expectedResults: []int{42, 88},
timeout: longDuration,
maxAge: shortDuration,
},
{
name: "stale_cache_slow_timeout_then_slow_timeout",
functionReturns: []int{42, 88},
executionDurations: []time.Duration{veryLongDuration, veryLongDuration},
expectedResults: []int{42, 88},
timeout: longDuration,
maxAge: shortDuration,
},
{
name: "stale_cache_fast_then_slow_timeout_then_fast_then_slow",
functionReturns: []int{42, 88, 1, 33},
executionDurations: []time.Duration{veryShortDuration, veryLongDuration, veryShortDuration, veryLongDuration},
expectedResults: []int{42, 88, 1, 33},
timeout: longDuration,
maxAge: shortDuration,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
numGoRoutineAtRest := runtime.NumGoroutine()
endTime := time.Now()

functionReturnChan := make(chan int, 1)
executionDurationChan := make(chan time.Duration, 1)
fn := func() interface{} {
time.Sleep(<-executionDurationChan)
return <-functionReturnChan
}
decoratedFunc := TimeBoundWithCacheFunc(fn, tt.timeout, tt.maxAge)
var cacheTimestamp time.Time

for i := range tt.executionDurations {
// compute end time for all function executions of the test
finishesAt := time.Now().Add(tt.executionDurations[i])
if finishesAt.After(endTime) {
endTime = finishesAt
}

functionReturnChan <- tt.functionReturns[i]
executionDurationChan <- tt.executionDurations[i]
beforeExecution := time.Now()
result := decoratedFunc()
executionDuration := time.Now().Sub(beforeExecution)

// check that function execution took around tt.timeout
// if the function does not have a cached result or the cache is stale due to maxAge, the timeout is not enforced
cacheIsStale := time.Now().After(cacheTimestamp.Add(tt.maxAge))
if !cacheIsStale && executionDuration > tt.timeout+testMachineLatency {
t.Errorf("iteration %v: decoratedFunc() took too long to execute %v which is greater than timeout (%v)", i, executionDuration, tt.timeout)
}

if !reflect.DeepEqual(result, tt.expectedResults[i]) {
t.Errorf("iteration %v: decoratedFunc() = %v, want %v", i, result, tt.expectedResults[i])
}

// save last time a result was cached
if tt.executionDurations[i] <= tt.timeout {
cacheTimestamp = time.Now()
}
}

// detects if any goroutine is leaked after all processing is supposed to be complete
time.Sleep(time.Until(endTime.Add(testMachineLatency)))
extraGoRoutines := runtime.NumGoroutine() - numGoRoutineAtRest
if extraGoRoutines > 0 {
t.Errorf("too many goroutines left after processing: %d", extraGoRoutines)
}
})
}
}