diff --git a/godeps.txt b/godeps.txt index 3d79e195a..eadec22e9 100644 --- a/godeps.txt +++ b/godeps.txt @@ -835,7 +835,6 @@ github.com/ServiceWeaver/weaver/runtime/logging time github.com/ServiceWeaver/weaver/runtime/metrics encoding/binary - expvar fmt github.com/ServiceWeaver/weaver/runtime/protos github.com/google/uuid diff --git a/metrics/metrics.go b/metrics/metrics.go index e473d5db7..e4b9277fd 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -39,6 +39,11 @@ func (c *Counter) Name() string { return c.impl.Name() } +// Inc increases the counter by one. +func (c *Counter) Inc() { + c.impl.Inc() +} + // Add increases the counter by delta. It panics if the delta is negative. func (c *Counter) Add(delta float64) { c.impl.Add(delta) diff --git a/runtime/codegen/metrics.go b/runtime/codegen/metrics.go index 1fdbfbbd2..96d90661d 100644 --- a/runtime/codegen/metrics.go +++ b/runtime/codegen/metrics.go @@ -93,9 +93,9 @@ func (m *MethodMetrics) Begin() MethodCallHandle { // End ends metric update recording for a call to method m. func (m *MethodMetrics) End(h MethodCallHandle, failed bool, requestBytes, replyBytes int) { latency := time.Now().UnixMicro() - h.start - m.Count.Add(1) + m.Count.Inc() if failed { - m.ErrorCount.Add(1) + m.ErrorCount.Inc() } m.Latency.Put(float64(latency)) if m.remote { diff --git a/runtime/metrics/atomic.go b/runtime/metrics/atomic.go new file mode 100644 index 000000000..762e73118 --- /dev/null +++ b/runtime/metrics/atomic.go @@ -0,0 +1,44 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "math" + "sync/atomic" +) + +// atomicFloat64 provides atomic storage for float64. +type atomicFloat64 struct { + v atomic.Uint64 // stores result of math.Float64bits +} + +// get returns the current value stored in f. +func (f *atomicFloat64) get() float64 { return math.Float64frombits(f.v.Load()) } + +// set stores v in f. +func (f *atomicFloat64) set(v float64) { f.v.Store(math.Float64bits(v)) } + +// add atomically adds v to f. +func (f *atomicFloat64) add(v float64) { + // Use compare-and-swap to change the stored representation + // atomically from old value to old value + v. + for { + cur := f.v.Load() + next := math.Float64bits(math.Float64frombits(cur) + v) + if f.v.CompareAndSwap(cur, next) { + return + } + } +} diff --git a/runtime/metrics/atomic_test.go b/runtime/metrics/atomic_test.go new file mode 100644 index 000000000..f12709588 --- /dev/null +++ b/runtime/metrics/atomic_test.go @@ -0,0 +1,75 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "math" + "sync" + "testing" +) + +var testFloats = []float64{math.Inf(-1), -2, -1, 0.5, -0, 0, 0.5, 1, 2, math.Inf(1)} + +func TestAtomicFloat64GetSet(t *testing.T) { + var f atomicFloat64 + for _, v := range testFloats { + f.set(v) + if got := f.get(); got != v { + t.Errorf("get(set(%v)) = %v", v, got) + } + } +} + +func TestAtomicFloat64Add(t *testing.T) { + for _, a := range testFloats { + for _, b := range testFloats { + var v atomicFloat64 + v.add(a) + v.add(b) + want := a + b + got := v.get() + + same := (math.IsNaN(want) && math.IsNaN(got)) || (want == got) + if !same { + t.Errorf("Add(%v) to %v produced %v, expecting %v", b, a, got, want) + } + } + } +} + +func TestAtomicFloat64Parallelism(t *testing.T) { + var f atomicFloat64 + + // Add to f from multiple goroutines in parallel. + const iters = 1000000 + const parallelism = 8 + var wg sync.WaitGroup + wg.Add(parallelism) + for i := 0; i < parallelism; i++ { + go func() { + defer wg.Done() + for j := 0; j < iters; j++ { + f.add(1.0) + } + }() + } + wg.Wait() + + want := float64(iters * parallelism) + got := f.get() + if got != want { + t.Errorf("concurrent adds produced %v, expecting %v", got, want) + } +} diff --git a/runtime/metrics/io.go b/runtime/metrics/io.go index 9aa5835f8..26e39428e 100644 --- a/runtime/metrics/io.go +++ b/runtime/metrics/io.go @@ -17,8 +17,8 @@ package metrics import ( "fmt" - "golang.org/x/exp/maps" "github.com/ServiceWeaver/weaver/runtime/protos" + "golang.org/x/exp/maps" ) // An Exporter produces MetricUpdates summarizing the change in metrics over @@ -42,13 +42,13 @@ func (e *Exporter) Export() *protos.MetricUpdate { metric.Init() latest, ok := e.versions[metric.id] if !ok { - e.versions[metric.id] = metric.Version() + e.versions[metric.id] = metric.version.Load() update.Defs = append(update.Defs, metric.MetricDef()) update.Values = append(update.Values, metric.MetricValue()) continue } - version := metric.Version() + version := metric.version.Load() if version == latest { continue } diff --git a/runtime/metrics/metrics.go b/runtime/metrics/metrics.go index 3a0d86bde..fa01affa2 100644 --- a/runtime/metrics/metrics.go +++ b/runtime/metrics/metrics.go @@ -17,7 +17,6 @@ package metrics import ( "encoding/binary" - "expvar" "fmt" "math" "sort" @@ -68,11 +67,13 @@ type Metric struct { once sync.Once // used to initialize id and labels id uint64 // globally unique metric id labels map[string]string // materialized labels from calling labelsThunk + fvalue atomicFloat64 // value for Counter and Gauge, sum for Histogram + ivalue atomic.Uint64 // integer increments for Counter (separated for speed) - version atomic.Uint64 // incremented on every update, for change detection - value expvar.Float // value for Counter and Gauge, sum for Histogram - bounds []float64 // histogram bounds - counts []atomic.Uint64 // histogram counts + version atomic.Uint64 // incremented on every update, for change detection + + bounds []float64 // histogram bounds + counts []atomic.Uint64 // histogram counts } // A MetricSnapshot is a snapshot of a metric. @@ -185,32 +186,48 @@ func (m *Metric) Name() string { return m.name } +// Inc adds one to the metric value. +func (m *Metric) Inc() { + m.ivalue.Add(1) +} + // Add adds the provided delta to the metric's value. func (m *Metric) Add(delta float64) { - m.value.Add(delta) + m.fvalue.add(delta) m.version.Add(1) } // Sub subtracts the provided delta from the metric's value. func (m *Metric) Sub(delta float64) { - m.value.Add(-delta) + m.fvalue.add(-delta) m.version.Add(1) } // Set sets the metric's value. func (m *Metric) Set(val float64) { - m.value.Set(val) + m.fvalue.set(val) m.version.Add(1) } // Put adds the provided value to the metric's histogram. func (m *Metric) Put(val float64) { - idx := sort.SearchFloat64s(m.bounds, val) - if idx < len(m.bounds) && val == m.bounds[idx] { - idx++ + var idx int + if len(m.bounds) == 0 || val < m.bounds[0] { + // Skip binary search for values that fall in the first bucket + // (often true for short latency operations). + } else { + idx = sort.SearchFloat64s(m.bounds, val) + if idx < len(m.bounds) && val == m.bounds[idx] { + idx++ + } } m.counts[idx].Add(1) - m.value.Add(val) + + // Microsecond latencies are often zero for very fast functions. + if val != 0 { + m.fvalue.add(val) + } + m.version.Add(1) } @@ -225,9 +242,9 @@ func (m *Metric) Init() { }) } -// Version returns the metric's version. -func (m *Metric) Version() uint64 { - return m.version.Load() +// get returns the current value (sum of all added values for histograms). +func (m *Metric) get() float64 { + return m.fvalue.get() + float64(m.ivalue.Load()) } // Snapshot returns a snapshot of the metric. You must call Init at least once @@ -246,7 +263,7 @@ func (m *Metric) Snapshot() *MetricSnapshot { Type: m.typ, Help: m.help, Labels: maps.Clone(m.labels), - Value: m.value.Value(), + Value: m.get(), Bounds: slices.Clone(m.bounds), Counts: counts, } @@ -276,7 +293,7 @@ func (m *Metric) MetricValue() *protos.MetricValue { } return &protos.MetricValue{ Id: m.id, - Value: m.value.Value(), + Value: m.get(), Counts: counts, } } diff --git a/runtime/metrics/metrics_test.go b/runtime/metrics/metrics_test.go index 14dd67dbb..a9f058358 100644 --- a/runtime/metrics/metrics_test.go +++ b/runtime/metrics/metrics_test.go @@ -22,9 +22,9 @@ import ( "sync" "testing" + "github.com/ServiceWeaver/weaver/runtime/protos" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/ServiceWeaver/weaver/runtime/protos" ) const ( @@ -287,7 +287,7 @@ func BenchmarkCounter(b *testing.B) { c := Register(counterType, "BenchmarkCounter/count", "", nil) b.ResetTimer() for i := 0; i < b.N; i++ { - c.Add(1) + c.Inc() } } @@ -323,7 +323,7 @@ func BenchmarkCounterMap1(b *testing.B) { c := l.Get(labels1{"xxxxxxxxxx"}) b.ResetTimer() for i := 0; i < b.N; i++ { - c.Add(1) + c.Inc() } } @@ -333,7 +333,7 @@ func BenchmarkCounterMap2(b *testing.B) { c := l.Get(labels2{"xxxxxxxxxx", "xxxxxxxxxx"}) b.ResetTimer() for i := 0; i < b.N; i++ { - c.Add(1) + c.Inc() } } @@ -346,7 +346,7 @@ func BenchmarkCounterMap5(b *testing.B) { }) b.ResetTimer() for i := 0; i < b.N; i++ { - c.Add(1) + c.Inc() } } @@ -361,7 +361,7 @@ func BenchmarkCounterMap10(b *testing.B) { }) b.ResetTimer() for i := 0; i < b.N; i++ { - c.Add(1) + c.Inc() } } @@ -389,7 +389,7 @@ func BenchmarkCounterMap50(b *testing.B) { }) b.ResetTimer() for i := 0; i < b.N; i++ { - c.Add(1) + c.Inc() } } @@ -399,7 +399,7 @@ func BenchmarkCounterGet1(b *testing.B) { labels := labels1{"xxxxxxxxxx"} b.ResetTimer() for i := 0; i < b.N; i++ { - l.Get(labels).Add(1) + l.Get(labels).Inc() } } @@ -409,7 +409,7 @@ func BenchmarkCounterGet2(b *testing.B) { labels := labels2{"xxxxxxxxxx", "xxxxxxxxxx"} b.ResetTimer() for i := 0; i < b.N; i++ { - l.Get(labels).Add(1) + l.Get(labels).Inc() } } @@ -422,7 +422,7 @@ func BenchmarkCounterGet5(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - l.Get(labels).Add(1) + l.Get(labels).Inc() } } @@ -437,7 +437,7 @@ func BenchmarkCounterGet10(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - l.Get(labels).Add(1) + l.Get(labels).Inc() } } @@ -465,7 +465,7 @@ func BenchmarkCounterGet50(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - l.Get(labels).Add(1) + l.Get(labels).Inc() } }