Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed-up metric maintenance for component method calls. #440

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,6 @@ github.com/ServiceWeaver/weaver/runtime/logging
time
github.com/ServiceWeaver/weaver/runtime/metrics
encoding/binary
expvar
fmt
github.com/ServiceWeaver/weaver/runtime/protos
github.com/google/uuid
Expand Down
5 changes: 5 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (c *Counter) Name() string {
return c.impl.Name()
}

// Inc increases the counter by one.
func (c *Counter) Inc() {
c.impl.Inc()
}

// Add increases the counter by delta. It panics if the delta is negative.
func (c *Counter) Add(delta float64) {
c.impl.Add(delta)
Expand Down
4 changes: 2 additions & 2 deletions runtime/codegen/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (m *MethodMetrics) Begin() MethodCallHandle {
// End ends metric update recording for a call to method m.
func (m *MethodMetrics) End(h MethodCallHandle, failed bool, requestBytes, replyBytes int) {
latency := time.Now().UnixMicro() - h.start
m.Count.Add(1)
m.Count.Inc()
if failed {
m.ErrorCount.Add(1)
m.ErrorCount.Inc()
}
m.Latency.Put(float64(latency))
if m.remote {
Expand Down
44 changes: 44 additions & 0 deletions runtime/metrics/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"math"
"sync/atomic"
)

// atomicFloat64 provides atomic storage for float64.
type atomicFloat64 struct {
v atomic.Uint64 // stores result of math.Float64bits
}

// get returns the current value stored in f.
func (f *atomicFloat64) get() float64 { return math.Float64frombits(f.v.Load()) }

// set stores v in f.
func (f *atomicFloat64) set(v float64) { f.v.Store(math.Float64bits(v)) }

// add atomically adds v to f.
func (f *atomicFloat64) add(v float64) {
// Use compare-and-swap to change the stored representation
// atomically from old value to old value + v.
for {
cur := f.v.Load()
next := math.Float64bits(math.Float64frombits(cur) + v)
if f.v.CompareAndSwap(cur, next) {
return
}
}
}
75 changes: 75 additions & 0 deletions runtime/metrics/atomic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"math"
"sync"
"testing"
)

var testFloats = []float64{math.Inf(-1), -2, -1, 0.5, -0, 0, 0.5, 1, 2, math.Inf(1)}

func TestAtomicFloat64GetSet(t *testing.T) {
var f atomicFloat64
for _, v := range testFloats {
f.set(v)
if got := f.get(); got != v {
t.Errorf("get(set(%v)) = %v", v, got)
}
}
}

func TestAtomicFloat64Add(t *testing.T) {
for _, a := range testFloats {
for _, b := range testFloats {
var v atomicFloat64
v.add(a)
v.add(b)
want := a + b
got := v.get()

same := (math.IsNaN(want) && math.IsNaN(got)) || (want == got)
if !same {
t.Errorf("Add(%v) to %v produced %v, expecting %v", b, a, got, want)
}
}
}
}

func TestAtomicFloat64Parallelism(t *testing.T) {
var f atomicFloat64

// Add to f from multiple goroutines in parallel.
const iters = 1000000
const parallelism = 8
var wg sync.WaitGroup
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func() {
defer wg.Done()
for j := 0; j < iters; j++ {
f.add(1.0)
}
}()
}
wg.Wait()

want := float64(iters * parallelism)
got := f.get()
if got != want {
t.Errorf("concurrent adds produced %v, expecting %v", got, want)
}
}
6 changes: 3 additions & 3 deletions runtime/metrics/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package metrics
import (
"fmt"

"golang.org/x/exp/maps"
"github.com/ServiceWeaver/weaver/runtime/protos"
"golang.org/x/exp/maps"
)

// An Exporter produces MetricUpdates summarizing the change in metrics over
Expand All @@ -42,13 +42,13 @@ func (e *Exporter) Export() *protos.MetricUpdate {
metric.Init()
latest, ok := e.versions[metric.id]
if !ok {
e.versions[metric.id] = metric.Version()
e.versions[metric.id] = metric.version.Load()
update.Defs = append(update.Defs, metric.MetricDef())
update.Values = append(update.Values, metric.MetricValue())
continue
}

version := metric.Version()
version := metric.version.Load()
if version == latest {
continue
}
Expand Down
51 changes: 34 additions & 17 deletions runtime/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package metrics

import (
"encoding/binary"
"expvar"
"fmt"
"math"
"sort"
Expand Down Expand Up @@ -68,11 +67,13 @@ type Metric struct {
once sync.Once // used to initialize id and labels
id uint64 // globally unique metric id
labels map[string]string // materialized labels from calling labelsThunk
fvalue atomicFloat64 // value for Counter and Gauge, sum for Histogram
ivalue atomic.Uint64 // integer increments for Counter (separated for speed)

version atomic.Uint64 // incremented on every update, for change detection
value expvar.Float // value for Counter and Gauge, sum for Histogram
bounds []float64 // histogram bounds
counts []atomic.Uint64 // histogram counts
version atomic.Uint64 // incremented on every update, for change detection

bounds []float64 // histogram bounds
counts []atomic.Uint64 // histogram counts
}

// A MetricSnapshot is a snapshot of a metric.
Expand Down Expand Up @@ -185,32 +186,48 @@ func (m *Metric) Name() string {
return m.name
}

// Inc adds one to the metric value.
func (m *Metric) Inc() {
m.ivalue.Add(1)
}

// Add adds the provided delta to the metric's value.
func (m *Metric) Add(delta float64) {
m.value.Add(delta)
m.fvalue.add(delta)
m.version.Add(1)
}

// Sub subtracts the provided delta from the metric's value.
func (m *Metric) Sub(delta float64) {
m.value.Add(-delta)
m.fvalue.add(-delta)
m.version.Add(1)
}

// Set sets the metric's value.
func (m *Metric) Set(val float64) {
m.value.Set(val)
m.fvalue.set(val)
m.version.Add(1)
}

// Put adds the provided value to the metric's histogram.
func (m *Metric) Put(val float64) {
idx := sort.SearchFloat64s(m.bounds, val)
if idx < len(m.bounds) && val == m.bounds[idx] {
idx++
var idx int
if len(m.bounds) == 0 || val < m.bounds[0] {
// Skip binary search for values that fall in the first bucket
// (often true for short latency operations).
} else {
idx = sort.SearchFloat64s(m.bounds, val)
if idx < len(m.bounds) && val == m.bounds[idx] {
idx++
}
}
m.counts[idx].Add(1)
m.value.Add(val)

// Microsecond latencies are often zero for very fast functions.
if val != 0 {
m.fvalue.add(val)
}

m.version.Add(1)
}

Expand All @@ -225,9 +242,9 @@ func (m *Metric) Init() {
})
}

// Version returns the metric's version.
func (m *Metric) Version() uint64 {
return m.version.Load()
// get returns the current value (sum of all added values for histograms).
func (m *Metric) get() float64 {
return m.fvalue.get() + float64(m.ivalue.Load())
}

// Snapshot returns a snapshot of the metric. You must call Init at least once
Expand All @@ -246,7 +263,7 @@ func (m *Metric) Snapshot() *MetricSnapshot {
Type: m.typ,
Help: m.help,
Labels: maps.Clone(m.labels),
Value: m.value.Value(),
Value: m.get(),
Bounds: slices.Clone(m.bounds),
Counts: counts,
}
Expand Down Expand Up @@ -276,7 +293,7 @@ func (m *Metric) MetricValue() *protos.MetricValue {
}
return &protos.MetricValue{
Id: m.id,
Value: m.value.Value(),
Value: m.get(),
Counts: counts,
}
}
Expand Down
Loading