Skip to content

Commit

Permalink
fix: isPaneDeprecated method
Browse files Browse the repository at this point in the history
  • Loading branch information
ev1lQuark committed Jul 13, 2023
1 parent 42176f4 commit f7bb4e2
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 47 deletions.
28 changes: 12 additions & 16 deletions metrics/util/aggregate/pane.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,23 @@ package aggregate
// pane represents a window over a period of time.
// It uses interface{} to store any type of value.
type pane struct {
StartInMs int64
EndInMs int64
IntervalInMs int64
Value interface{}
startInMs int64
endInMs int64
intervalInMs int64
value interface{}
}

func newPane(intervalInMs, startInMs int64, value interface{}) *pane {
return &pane{
StartInMs: startInMs,
EndInMs: startInMs + intervalInMs,
IntervalInMs: intervalInMs,
Value: value,
startInMs: startInMs,
endInMs: startInMs + intervalInMs,
intervalInMs: intervalInMs,
value: value,
}
}

// isTimeInWindow checks whether given timestamp is in current pane.
func (p *pane) isTimeInWindow(timeMillis int64) bool {
return p.StartInMs <= timeMillis && timeMillis < p.EndInMs
}

// isPaneDeprecated checks if the specified pane is deprecated at the specified timestamp
func (p *pane) isPaneDeprecated(timeMillis int64) bool {
return timeMillis-p.StartInMs > p.IntervalInMs
func (p *pane) resetTo(startInMs int64, value interface{}) {
p.startInMs = startInMs
p.endInMs = startInMs + p.intervalInMs
p.value = value
}
44 changes: 20 additions & 24 deletions metrics/util/aggregate/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package aggregate

import (
"github.com/influxdata/tdigest"
"sync"
"time"
)

import (
"github.com/influxdata/tdigest"
)

// TimeWindowQuantile wrappers sliding window around T-Digest.
//
// It uses T-Digest algorithm to calculate quantile.
Expand All @@ -40,50 +43,43 @@ func NewTimeWindowQuantile(compression float64, paneCount int, timeWindowSeconds
}
}

// Quantile returns the quantile of the sliding window by merging all panes.
// Quantile returns a quantile of the sliding window by merging all panes.
func (t *TimeWindowQuantile) Quantile(q float64) float64 {
t.mux.RLock()
defer t.mux.RUnlock()
return t.mergeTDigests().Quantile(q)
}

td := tdigest.NewWithCompression(t.compression)
for _, v := range t.window.values(time.Now().UnixMilli()) {
td.AddCentroidList(v.(*tdigest.TDigest).Centroids())
// Quantiles returns quantiles of the sliding window by merging all panes.
func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 {
td := t.mergeTDigests()

res := make([]float64, len(qs))
for i, q := range qs {
res[i] = td.Quantile(q)
}
return td.Quantile(q)

return res
}

func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 {
// mergeTDigests merges all panes' TDigests into one TDigest.
func (t *TimeWindowQuantile) mergeTDigests() *tdigest.TDigest {
t.mux.RLock()
defer t.mux.RUnlock()

td := tdigest.NewWithCompression(t.compression)
for _, v := range t.window.values(time.Now().UnixMilli()) {
td.AddCentroidList(v.(*tdigest.TDigest).Centroids())
}

res := make([]float64, len(qs))
for i, q := range qs {
res[i] = td.Quantile(q)
}

return res
return td
}

// Add adds a value to the sliding window's current pane.
func (t *TimeWindowQuantile) Add(value float64) {
t.mux.Lock()
defer t.mux.Unlock()

t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue, t.resetPaneTo).Value.(*tdigest.TDigest).Add(value, 1)
t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*tdigest.TDigest).Add(value, 1)
}

func (t *TimeWindowQuantile) newEmptyValue() interface{} {
return tdigest.NewWithCompression(t.compression)
}

func (t *TimeWindowQuantile) resetPaneTo(p *pane, paneStart int64) *pane {
p.StartInMs = paneStart
p.EndInMs = paneStart + t.window.paneIntervalInMs
p.Value = t.newEmptyValue()
return p
}
2 changes: 1 addition & 1 deletion metrics/util/aggregate/quantile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package aggregate

import "testing"

func TestTimeWindowQuantile_AddAndQuantile(t1 *testing.T) {
func TestAddAndQuantile(t1 *testing.T) {
timeWindowQuantile := NewTimeWindowQuantile(100, 10, 1)
for i := 1; i <= 100; i++ {
timeWindowQuantile.Add(float64(i))
Expand Down
18 changes: 12 additions & 6 deletions metrics/util/aggregate/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,22 @@ func (s *slidingWindow) values(timeMillis int64) []interface{} {
res := make([]interface{}, 0, s.paneCount)

for _, p := range s.paneSlice {
if p == nil || p.isPaneDeprecated(timeMillis) {
if p == nil || s.isPaneDeprecated(p, timeMillis) {
continue
}
res = append(res, p.Value)
res = append(res, p.value)
}

return res
}

// isPaneDeprecated checks if the specified pane is deprecated at the specified timeMillis
func (s *slidingWindow) isPaneDeprecated(pane *pane, timeMillis int64) bool {
return timeMillis-pane.startInMs > s.intervalInMs
}

// currentPane get the pane at the specified timestamp in milliseconds.
func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}, resetPaneTo func(*pane, int64) *pane) *pane {
func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}) *pane {
if timeMillis < 0 {
return nil
}
Expand All @@ -70,11 +75,12 @@ func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() inter
return p
} else {
p := s.paneSlice[paneIdx]
if paneStart == p.StartInMs {
if paneStart == p.startInMs {
return p
} else if paneStart > p.StartInMs {
} else if paneStart > p.startInMs {
// The pane has deprecated. To avoid the overhead of creating a new instance, reset the original pane directly.
return resetPaneTo(p, paneStart)
p.resetTo(paneStart, newEmptyValue())
return p
} else {
// The specified timestamp has passed.
return newPane(s.paneIntervalInMs, paneStart, newEmptyValue())
Expand Down

0 comments on commit f7bb4e2

Please sign in to comment.