diff --git a/metrics/util/aggregate/pane.go b/metrics/util/aggregate/pane.go index 0fbb77046b..a7b0fba545 100644 --- a/metrics/util/aggregate/pane.go +++ b/metrics/util/aggregate/pane.go @@ -20,27 +20,23 @@ package aggregate // pane represents a window over a period of time. // It uses interface{} to store any type of value. type pane struct { - StartInMs int64 - EndInMs int64 - IntervalInMs int64 - Value interface{} + startInMs int64 + endInMs int64 + intervalInMs int64 + value interface{} } func newPane(intervalInMs, startInMs int64, value interface{}) *pane { return &pane{ - StartInMs: startInMs, - EndInMs: startInMs + intervalInMs, - IntervalInMs: intervalInMs, - Value: value, + startInMs: startInMs, + endInMs: startInMs + intervalInMs, + intervalInMs: intervalInMs, + value: value, } } -// isTimeInWindow checks whether given timestamp is in current pane. -func (p *pane) isTimeInWindow(timeMillis int64) bool { - return p.StartInMs <= timeMillis && timeMillis < p.EndInMs -} - -// isPaneDeprecated checks if the specified pane is deprecated at the specified timestamp -func (p *pane) isPaneDeprecated(timeMillis int64) bool { - return timeMillis-p.StartInMs > p.IntervalInMs +func (p *pane) resetTo(startInMs int64, value interface{}) { + p.startInMs = startInMs + p.endInMs = startInMs + p.intervalInMs + p.value = value } diff --git a/metrics/util/aggregate/quantile.go b/metrics/util/aggregate/quantile.go index f0502e6ada..1a78ba58c7 100644 --- a/metrics/util/aggregate/quantile.go +++ b/metrics/util/aggregate/quantile.go @@ -18,11 +18,14 @@ package aggregate import ( - "github.com/influxdata/tdigest" "sync" "time" ) +import ( + "github.com/influxdata/tdigest" +) + // TimeWindowQuantile wrappers sliding window around T-Digest. // // It uses T-Digest algorithm to calculate quantile. @@ -40,19 +43,25 @@ func NewTimeWindowQuantile(compression float64, paneCount int, timeWindowSeconds } } -// Quantile returns the quantile of the sliding window by merging all panes. +// Quantile returns a quantile of the sliding window by merging all panes. func (t *TimeWindowQuantile) Quantile(q float64) float64 { - t.mux.RLock() - defer t.mux.RUnlock() + return t.mergeTDigests().Quantile(q) +} - td := tdigest.NewWithCompression(t.compression) - for _, v := range t.window.values(time.Now().UnixMilli()) { - td.AddCentroidList(v.(*tdigest.TDigest).Centroids()) +// Quantiles returns quantiles of the sliding window by merging all panes. +func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 { + td := t.mergeTDigests() + + res := make([]float64, len(qs)) + for i, q := range qs { + res[i] = td.Quantile(q) } - return td.Quantile(q) + + return res } -func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 { +// mergeTDigests merges all panes' TDigests into one TDigest. +func (t *TimeWindowQuantile) mergeTDigests() *tdigest.TDigest { t.mux.RLock() defer t.mux.RUnlock() @@ -60,13 +69,7 @@ func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 { for _, v := range t.window.values(time.Now().UnixMilli()) { td.AddCentroidList(v.(*tdigest.TDigest).Centroids()) } - - res := make([]float64, len(qs)) - for i, q := range qs { - res[i] = td.Quantile(q) - } - - return res + return td } // Add adds a value to the sliding window's current pane. @@ -74,16 +77,9 @@ func (t *TimeWindowQuantile) Add(value float64) { t.mux.Lock() defer t.mux.Unlock() - t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue, t.resetPaneTo).Value.(*tdigest.TDigest).Add(value, 1) + t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*tdigest.TDigest).Add(value, 1) } func (t *TimeWindowQuantile) newEmptyValue() interface{} { return tdigest.NewWithCompression(t.compression) } - -func (t *TimeWindowQuantile) resetPaneTo(p *pane, paneStart int64) *pane { - p.StartInMs = paneStart - p.EndInMs = paneStart + t.window.paneIntervalInMs - p.Value = t.newEmptyValue() - return p -} diff --git a/metrics/util/aggregate/quantile_test.go b/metrics/util/aggregate/quantile_test.go index efcb022f0e..5e82431661 100644 --- a/metrics/util/aggregate/quantile_test.go +++ b/metrics/util/aggregate/quantile_test.go @@ -19,7 +19,7 @@ package aggregate import "testing" -func TestTimeWindowQuantile_AddAndQuantile(t1 *testing.T) { +func TestAddAndQuantile(t1 *testing.T) { timeWindowQuantile := NewTimeWindowQuantile(100, 10, 1) for i := 1; i <= 100; i++ { timeWindowQuantile.Add(float64(i)) diff --git a/metrics/util/aggregate/sliding_window.go b/metrics/util/aggregate/sliding_window.go index d6aa628c02..feda9219e3 100644 --- a/metrics/util/aggregate/sliding_window.go +++ b/metrics/util/aggregate/sliding_window.go @@ -47,17 +47,22 @@ func (s *slidingWindow) values(timeMillis int64) []interface{} { res := make([]interface{}, 0, s.paneCount) for _, p := range s.paneSlice { - if p == nil || p.isPaneDeprecated(timeMillis) { + if p == nil || s.isPaneDeprecated(p, timeMillis) { continue } - res = append(res, p.Value) + res = append(res, p.value) } return res } +// isPaneDeprecated checks if the specified pane is deprecated at the specified timeMillis +func (s *slidingWindow) isPaneDeprecated(pane *pane, timeMillis int64) bool { + return timeMillis-pane.startInMs > s.intervalInMs +} + // currentPane get the pane at the specified timestamp in milliseconds. -func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}, resetPaneTo func(*pane, int64) *pane) *pane { +func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}) *pane { if timeMillis < 0 { return nil } @@ -70,11 +75,12 @@ func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() inter return p } else { p := s.paneSlice[paneIdx] - if paneStart == p.StartInMs { + if paneStart == p.startInMs { return p - } else if paneStart > p.StartInMs { + } else if paneStart > p.startInMs { // The pane has deprecated. To avoid the overhead of creating a new instance, reset the original pane directly. - return resetPaneTo(p, paneStart) + p.resetTo(paneStart, newEmptyValue()) + return p } else { // The specified timestamp has passed. return newPane(s.paneIntervalInMs, paneStart, newEmptyValue())