From 2e0145e8e7b0f97b5104887384707c306d4cb2cb Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Sat, 15 Jul 2023 21:56:39 +0800 Subject: [PATCH 1/2] feat: wrapper sliding window with custom aggregator --- metrics/util/aggregate/aggregator.go | 164 ++++++++++++++++++++++ metrics/util/aggregate/aggregator_test.go | 54 +++++++ metrics/util/aggregate/sliding_window.go | 2 +- 3 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 metrics/util/aggregate/aggregator.go create mode 100644 metrics/util/aggregate/aggregator_test.go diff --git a/metrics/util/aggregate/aggregator.go b/metrics/util/aggregate/aggregator.go new file mode 100644 index 0000000000..aaf2c854b0 --- /dev/null +++ b/metrics/util/aggregate/aggregator.go @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package aggregate + +import ( + "math" + "sync" + "sync/atomic" + "time" +) + +// TimeWindowAggregator wrappers sliding window to aggregate data. +// +// It uses custom struct aggregator to aggregate data. +// The window is divided into several panes, and each pane's value is an aggregator instance. +type TimeWindowAggregator struct { + window *slidingWindow + mux sync.RWMutex +} + +func NewTimeWindowAggregator(paneCount int, timeWindowSeconds int64) *TimeWindowAggregator { + return &TimeWindowAggregator{ + window: newSlidingWindow(paneCount, timeWindowSeconds*1000), + } +} + +type AggregateResult struct { + Total float64 + Min float64 + Max float64 + Avg float64 + Count uint64 +} + +// Result returns the aggregate result of the sliding window by aggregating all panes. +func (t *TimeWindowAggregator) Result() *AggregateResult { + t.mux.RLock() + defer t.mux.RUnlock() + + res := &AggregateResult{} + + total := 0.0 + count := uint64(0) + max := math.SmallestNonzeroFloat64 + min := math.MaxFloat64 + + for _, v := range t.window.values(time.Now().UnixMilli()) { + total += v.(*aggregator).total.Load().(float64) + count += v.(*aggregator).count.Load().(uint64) + max = math.Max(max, v.(*aggregator).max.Load().(float64)) + min = math.Min(min, v.(*aggregator).min.Load().(float64)) + } + + if count > 0 { + res.Avg = total / float64(count) + res.Count = count + res.Total = total + res.Max = max + res.Min = min + } + + return res +} + +// Add adds a value to the sliding window's current pane. +func (t *TimeWindowAggregator) Add(v float64) { + t.mux.Lock() + defer t.mux.Unlock() + + t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*aggregator).add(v) +} + +func (t *TimeWindowAggregator) newEmptyValue() interface{} { + return &aggregator{} +} + +type aggregator struct { + min atomic.Value // float64 + max atomic.Value // float64 + total atomic.Value // float64 + count atomic.Value // uint64 +} + +func (a *aggregator) add(v float64) { + a.updateMin(v) + a.updateMax(v) + a.updateTotal(v) + a.updateCount() +} + +func (a *aggregator) updateMin(v float64) { + for { + store := a.min.Load() + if store == nil { + if ok := a.min.CompareAndSwap(nil, v); ok { + return + } + } else { + if ok := a.min.CompareAndSwap(store, math.Min(store.(float64), v)); ok { + return + } + } + } +} + +func (a *aggregator) updateMax(v float64) { + for { + store := a.max.Load() + if store == nil { + if ok := a.max.CompareAndSwap(nil, v); ok { + return + } + } else { + if ok := a.max.CompareAndSwap(store, math.Max(store.(float64), v)); ok { + return + } + } + } +} + +func (a *aggregator) updateTotal(v float64) { + for { + store := a.total.Load() + if store == nil { + if ok := a.total.CompareAndSwap(nil, v); ok { + return + } + } else { + if ok := a.total.CompareAndSwap(store, store.(float64)+v); ok { + return + } + } + } +} + +func (a *aggregator) updateCount() { + for { + store := a.count.Load() + if store == nil { + if ok := a.count.CompareAndSwap(nil, uint64(1)); ok { + return + } + } else { + if ok := a.count.CompareAndSwap(store, store.(uint64)+1); ok { + return + } + } + } +} diff --git a/metrics/util/aggregate/aggregator_test.go b/metrics/util/aggregate/aggregator_test.go new file mode 100644 index 0000000000..3b93c46abe --- /dev/null +++ b/metrics/util/aggregate/aggregator_test.go @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package aggregate + +import ( + "reflect" + "testing" +) + +func TestAddAndResult(t *testing.T) { + timeWindowAggregator := NewTimeWindowAggregator(10, 1) + timeWindowAggregator.Add(10) + timeWindowAggregator.Add(20) + timeWindowAggregator.Add(30) + + tests := []struct { + name string + want *AggregateResult + }{ + { + name: "Result", + want: &AggregateResult{ + Total: 60, + Min: 10, + Max: 30, + Avg: 20, + Count: 3, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := timeWindowAggregator.Result(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Result() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/metrics/util/aggregate/sliding_window.go b/metrics/util/aggregate/sliding_window.go index feda9219e3..9d337380d4 100644 --- a/metrics/util/aggregate/sliding_window.go +++ b/metrics/util/aggregate/sliding_window.go @@ -61,7 +61,7 @@ func (s *slidingWindow) isPaneDeprecated(pane *pane, timeMillis int64) bool { return timeMillis-pane.startInMs > s.intervalInMs } -// currentPane get the pane at the specified timestamp in milliseconds. +// currentPane get the pane at the specified timestamp or create a new one if the pane is deprecated. func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}) *pane { if timeMillis < 0 { return nil From ba5ecc35104c6161f3a24437d33a2b36bd271085 Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Sun, 16 Jul 2023 10:36:26 +0800 Subject: [PATCH 2/2] fix: replace unnecessary atomic types with plain types --- metrics/util/aggregate/aggregator.go | 91 ++++++++--------------- metrics/util/aggregate/aggregator_test.go | 37 ++++++++- metrics/util/aggregate/quantile.go | 1 + metrics/util/aggregate/sliding_window.go | 2 +- 4 files changed, 66 insertions(+), 65 deletions(-) diff --git a/metrics/util/aggregate/aggregator.go b/metrics/util/aggregate/aggregator.go index aaf2c854b0..0f31f3fc0c 100644 --- a/metrics/util/aggregate/aggregator.go +++ b/metrics/util/aggregate/aggregator.go @@ -20,12 +20,12 @@ package aggregate import ( "math" "sync" - "sync/atomic" "time" ) // TimeWindowAggregator wrappers sliding window to aggregate data. // +// It is concurrent-safe. // It uses custom struct aggregator to aggregate data. // The window is divided into several panes, and each pane's value is an aggregator instance. type TimeWindowAggregator struct { @@ -39,7 +39,7 @@ func NewTimeWindowAggregator(paneCount int, timeWindowSeconds int64) *TimeWindow } } -type AggregateResult struct { +type Result struct { Total float64 Min float64 Max float64 @@ -48,11 +48,11 @@ type AggregateResult struct { } // Result returns the aggregate result of the sliding window by aggregating all panes. -func (t *TimeWindowAggregator) Result() *AggregateResult { +func (t *TimeWindowAggregator) Result() *Result { t.mux.RLock() defer t.mux.RUnlock() - res := &AggregateResult{} + res := &Result{} total := 0.0 count := uint64(0) @@ -60,10 +60,10 @@ func (t *TimeWindowAggregator) Result() *AggregateResult { min := math.MaxFloat64 for _, v := range t.window.values(time.Now().UnixMilli()) { - total += v.(*aggregator).total.Load().(float64) - count += v.(*aggregator).count.Load().(uint64) - max = math.Max(max, v.(*aggregator).max.Load().(float64)) - min = math.Min(min, v.(*aggregator).min.Load().(float64)) + total += v.(*aggregator).total + count += v.(*aggregator).count + max = math.Max(max, v.(*aggregator).max) + min = math.Min(min, v.(*aggregator).min) } if count > 0 { @@ -86,14 +86,27 @@ func (t *TimeWindowAggregator) Add(v float64) { } func (t *TimeWindowAggregator) newEmptyValue() interface{} { - return &aggregator{} + return newAggregator() } +// aggregator is a custom struct to aggregate data. +// +// It is NOT concurrent-safe. +// It aggregates data by calculating the min, max, total and count. type aggregator struct { - min atomic.Value // float64 - max atomic.Value // float64 - total atomic.Value // float64 - count atomic.Value // uint64 + min float64 + max float64 + total float64 + count uint64 +} + +func newAggregator() *aggregator { + return &aggregator{ + min: math.MaxFloat64, + max: math.SmallestNonzeroFloat64, + total: float64(0), + count: uint64(0), + } } func (a *aggregator) add(v float64) { @@ -104,61 +117,17 @@ func (a *aggregator) add(v float64) { } func (a *aggregator) updateMin(v float64) { - for { - store := a.min.Load() - if store == nil { - if ok := a.min.CompareAndSwap(nil, v); ok { - return - } - } else { - if ok := a.min.CompareAndSwap(store, math.Min(store.(float64), v)); ok { - return - } - } - } + a.min = math.Min(a.min, v) } func (a *aggregator) updateMax(v float64) { - for { - store := a.max.Load() - if store == nil { - if ok := a.max.CompareAndSwap(nil, v); ok { - return - } - } else { - if ok := a.max.CompareAndSwap(store, math.Max(store.(float64), v)); ok { - return - } - } - } + a.max = math.Max(a.max, v) } func (a *aggregator) updateTotal(v float64) { - for { - store := a.total.Load() - if store == nil { - if ok := a.total.CompareAndSwap(nil, v); ok { - return - } - } else { - if ok := a.total.CompareAndSwap(store, store.(float64)+v); ok { - return - } - } - } + a.total += v } func (a *aggregator) updateCount() { - for { - store := a.count.Load() - if store == nil { - if ok := a.count.CompareAndSwap(nil, uint64(1)); ok { - return - } - } else { - if ok := a.count.CompareAndSwap(store, store.(uint64)+1); ok { - return - } - } - } + a.count++ } diff --git a/metrics/util/aggregate/aggregator_test.go b/metrics/util/aggregate/aggregator_test.go index 3b93c46abe..d3fa4f644e 100644 --- a/metrics/util/aggregate/aggregator_test.go +++ b/metrics/util/aggregate/aggregator_test.go @@ -18,11 +18,13 @@ package aggregate import ( + "math/rand" "reflect" + "sync" "testing" ) -func TestAddAndResult(t *testing.T) { +func TestTimeWindowAggregatorAddAndResult(t *testing.T) { timeWindowAggregator := NewTimeWindowAggregator(10, 1) timeWindowAggregator.Add(10) timeWindowAggregator.Add(20) @@ -30,11 +32,11 @@ func TestAddAndResult(t *testing.T) { tests := []struct { name string - want *AggregateResult + want *Result }{ { name: "Result", - want: &AggregateResult{ + want: &Result{ Total: 60, Min: 10, Max: 30, @@ -52,3 +54,32 @@ func TestAddAndResult(t *testing.T) { }) } } + +func BenchmarkTimeWindowAggregatorAdd(b *testing.B) { + wg := sync.WaitGroup{} + tw := NewTimeWindowAggregator(10, 1) + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + tw.Add(rand.Float64() * 100) + }() + } + wg.Wait() +} + +func BenchmarkTimeWindowAggregatorResult(b *testing.B) { + wg := sync.WaitGroup{} + tw := NewTimeWindowAggregator(10, 1) + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + tw.Add(rand.Float64() * 100) + }() + go func() { + defer wg.Done() + tw.Result() + }() + } + wg.Wait() +} diff --git a/metrics/util/aggregate/quantile.go b/metrics/util/aggregate/quantile.go index 1a78ba58c7..c5aa4fd376 100644 --- a/metrics/util/aggregate/quantile.go +++ b/metrics/util/aggregate/quantile.go @@ -28,6 +28,7 @@ import ( // TimeWindowQuantile wrappers sliding window around T-Digest. // +// It is concurrent safe. // It uses T-Digest algorithm to calculate quantile. // The window is divided into several panes, and each pane's value is a TDigest instance. type TimeWindowQuantile struct { diff --git a/metrics/util/aggregate/sliding_window.go b/metrics/util/aggregate/sliding_window.go index 9d337380d4..42e2f083a7 100644 --- a/metrics/util/aggregate/sliding_window.go +++ b/metrics/util/aggregate/sliding_window.go @@ -19,7 +19,7 @@ package aggregate // SlidingWindow adopts sliding window algorithm for statistics. // -// It is not thread-safe. +// It is NOT concurrent-safe. // A window contains paneCount panes. // intervalInMs = paneCount * paneIntervalInMs. type slidingWindow struct {