Skip to content

Commit

Permalink
Merge branch 'main' into attr-filter-revert
Browse files Browse the repository at this point in the history
  • Loading branch information
hanyuancheung authored Aug 21, 2023
2 parents d193476 + f15ae16 commit 1fc005d
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 76 deletions.
99 changes: 43 additions & 56 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,6 @@ const (
minInt64 int64 = math.MinInt64
)

// expoHistogramValues summarizes a set of measurements as expoHistogramDataPoints using
// dynamically scaled buckets.
type expoHistogramValues[N int64 | float64] struct {
noSum bool
noMinMax bool
maxSize int
maxScale int

values map[attribute.Set]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
}

func newExpoHistValues[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramValues[N] {
return &expoHistogramValues[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: maxSize,
maxScale: maxScale,

values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (e *expoHistogramValues[N]) measure(_ context.Context, value N, attr attribute.Set) {
e.valuesMu.Lock()
defer e.valuesMu.Unlock()

v, ok := e.values[attr]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
e.values[attr] = v
}
v.record(value)
}

// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
count uint64
Expand Down Expand Up @@ -134,7 +97,7 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
return
}

bin := getBin(absV, p.scale)
bin := p.getBin(absV)

bucket := &p.posBuckets
if v < 0 {
Expand All @@ -143,7 +106,7 @@ func (p *expoHistogramDataPoint[N]) record(v N) {

// If the new bin would make the counts larger than maxScale, we need to
// downscale current measurements.
if scaleDelta := scaleChange(bin, bucket.startBin, len(bucket.counts), p.maxSize); scaleDelta > 0 {
if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
if p.scale-scaleDelta < expoMinScale {
// With a scale of -10 there is only two buckets for the whole range of float64 values.
// This can only happen if there is a max size of 1.
Expand All @@ -155,27 +118,26 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
p.posBuckets.downscale(scaleDelta)
p.negBuckets.downscale(scaleDelta)

bin = getBin(absV, p.scale)
bin = p.getBin(absV)
}

bucket.record(bin)
}

// getBin returns the bin of the bucket that the value v should be recorded
// into at the given scale.
func getBin(v float64, scale int) int {
// getBin returns the bin v should be recorded into.
func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
frac, exp := math.Frexp(v)
if scale <= 0 {
if p.scale <= 0 {
// Because of the choice of fraction is always 1 power of two higher than we want.
correction := 1
if frac == .5 {
// If v is an exact power of two the frac will be .5 and the exp
// will be one higher than we want.
correction = 2
}
return (exp - correction) >> (-scale)
return (exp - correction) >> (-p.scale)
}
return exp<<scale + int(math.Log(frac)*scaleFactors[scale]) - 1
return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1
}

// scaleFactors are constants used in calculating the logarithm index. They are
Expand Down Expand Up @@ -204,8 +166,9 @@ var scaleFactors = [21]float64{
math.Ldexp(math.Log2E, 20),
}

// scaleChange returns the magnitude of the scale change needed to fit bin in the bucket.
func scaleChange(bin, startBin, length, maxSize int) int {
// scaleChange returns the magnitude of the scale change needed to fit bin in
// the bucket. If no scale change is needed 0 is returned.
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
if length == 0 {
// No need to rescale if there are no buckets.
return 0
Expand All @@ -219,7 +182,7 @@ func scaleChange(bin, startBin, length, maxSize int) int {
}

count := 0
for high-low >= maxSize {
for high-low >= p.maxSize {
low = low >> 1
high = high >> 1
count++
Expand Down Expand Up @@ -327,24 +290,48 @@ func (b *expoBuckets) downscale(delta int) {
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
return &expoHistogram[N]{
expoHistogramValues: newExpoHistValues[N](
int(maxSize),
int(maxScale),
noMinMax,
noSum,
),
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: int(maxScale),

values: make(map[attribute.Set]*expoHistogramDataPoint[N]),

start: now(),
}
}

// expoHistogram summarizes a set of measurements as an histogram with exponentially
// defined buckets.
type expoHistogram[N int64 | float64] struct {
*expoHistogramValues[N]
noSum bool
noMinMax bool
maxSize int
maxScale int

values map[attribute.Set]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

start time.Time
}

func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) {
// Ignore NaN and infinity.
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
return
}

e.valuesMu.Lock()
defer e.valuesMu.Unlock()

v, ok := e.values[attr]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
e.values[attr] = v
}
v.record(value)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
t := now()

Expand Down
82 changes: 63 additions & 19 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func withHandler(t *testing.T) func() {

func TestExpoHistogramDataPointRecord(t *testing.T) {
t.Run("float64", testExpoHistogramDataPointRecord[float64])
t.Run("float64 MinMaxSum", testExpoHistogramDataPointRecordMinMaxSum[float64])
t.Run("float64 MinMaxSum", testExpoHistogramMinMaxSumFloat64)
t.Run("float64-2", testExpoHistogramDataPointRecordFloat64)
t.Run("int64", testExpoHistogramDataPointRecord[int64])
t.Run("int64 MinMaxSum", testExpoHistogramDataPointRecordMinMaxSum[int64])
t.Run("int64 MinMaxSum", testExpoHistogramMinMaxSumInt64)
}

// TODO: This can be defined in the test after we drop support for go1.19.
Expand Down Expand Up @@ -171,15 +171,15 @@ type expoHistogramDataPointRecordMinMaxSumTestCase[N int64 | float64] struct {
expected expectedMinMaxSum[N]
}

func testExpoHistogramDataPointRecordMinMaxSum[N int64 | float64](t *testing.T) {
testCases := []expoHistogramDataPointRecordMinMaxSumTestCase[N]{
func testExpoHistogramMinMaxSumInt64(t *testing.T) {
testCases := []expoHistogramDataPointRecordMinMaxSumTestCase[int64]{
{
values: []N{2, 4, 1},
expected: expectedMinMaxSum[N]{1, 4, 7, 3},
values: []int64{2, 4, 1},
expected: expectedMinMaxSum[int64]{1, 4, 7, 3},
},
{
values: []N{4, 4, 4, 2, 16, 1},
expected: expectedMinMaxSum[N]{1, 16, 31, 6},
values: []int64{4, 4, 4, 2, 16, 1},
expected: expectedMinMaxSum[int64]{1, 16, 31, 6},
},
}

Expand All @@ -188,10 +188,53 @@ func testExpoHistogramDataPointRecordMinMaxSum[N int64 | float64](t *testing.T)
restore := withHandler(t)
defer restore()

dp := newExpoHistogramDataPoint[N](4, 20, false, false)
h := newExponentialHistogram[int64](4, 20, false, false)
for _, v := range tt.values {
dp.record(v)
h.measure(context.Background(), v, alice)
}
dp := h.values[alice]

assert.Equal(t, tt.expected.max, dp.max)
assert.Equal(t, tt.expected.min, dp.min)
assert.Equal(t, tt.expected.sum, dp.sum)
})
}
}

func testExpoHistogramMinMaxSumFloat64(t *testing.T) {
testCases := []expoHistogramDataPointRecordMinMaxSumTestCase[float64]{
{
values: []float64{2, 4, 1},
expected: expectedMinMaxSum[float64]{1, 4, 7, 3},
},
{
values: []float64{2, 4, 1, math.Inf(1)},
expected: expectedMinMaxSum[float64]{1, 4, 7, 4},
},
{
values: []float64{2, 4, 1, math.Inf(-1)},
expected: expectedMinMaxSum[float64]{1, 4, 7, 4},
},
{
values: []float64{2, 4, 1, math.NaN()},
expected: expectedMinMaxSum[float64]{1, 4, 7, 4},
},
{
values: []float64{4, 4, 4, 2, 16, 1},
expected: expectedMinMaxSum[float64]{1, 16, 31, 6},
},
}

for _, tt := range testCases {
t.Run(fmt.Sprint(tt.values), func(t *testing.T) {
restore := withHandler(t)
defer restore()

h := newExponentialHistogram[float64](4, 20, false, false)
for _, v := range tt.values {
h.measure(context.Background(), v, alice)
}
dp := h.values[alice]

assert.Equal(t, tt.expected.max, dp.max)
assert.Equal(t, tt.expected.min, dp.min)
Expand Down Expand Up @@ -614,7 +657,8 @@ func TestScaleChange(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := scaleChange(tt.args.bin, tt.args.startBin, tt.args.length, tt.args.maxSize)
p := newExpoHistogramDataPoint[float64](tt.args.maxSize, 20, false, false)
got := p.scaleChange(tt.args.bin, tt.args.startBin, tt.args.length)
if got != tt.want {
t.Errorf("scaleChange() = %v, want %v", got, tt.want)
}
Expand Down Expand Up @@ -886,15 +930,15 @@ func FuzzGetBin(f *testing.F) {
t.Skip("skipping test for zero")
}

// GetBin is only used with a range of -10 to 20.
scale = (scale%31+31)%31 - 10

got := getBin(v, scale)
if v <= lowerBound(got, scale) {
t.Errorf("v=%x scale =%d had bin %d, but was below lower bound %x", v, scale, got, lowerBound(got, scale))
p := newExpoHistogramDataPoint[float64](4, 20, false, false)
// scale range is -10 to 20.
p.scale = (scale%31+31)%31 - 10
got := p.getBin(v)
if v <= lowerBound(got, p.scale) {
t.Errorf("v=%x scale =%d had bin %d, but was below lower bound %x", v, p.scale, got, lowerBound(got, p.scale))
}
if v > lowerBound(got+1, scale) {
t.Errorf("v=%x scale =%d had bin %d, but was above upper bound %x", v, scale, got, lowerBound(got+1, scale))
if v > lowerBound(got+1, p.scale) {
t.Errorf("v=%x scale =%d had bin %d, but was above upper bound %x", v, p.scale, got, lowerBound(got+1, p.scale))
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var errNonPositiveDuration = fmt.Errorf("non-positive duration")
// Reader is the interface used between the SDK and an
// exporter. Control flow is bi-directional through the
// Reader, since the SDK initiates ForceFlush and Shutdown
// while the initiates collection. The Register() method here
// while the exporter initiates collection. The Register() method here
// informs the Reader that it can begin reading, signaling the
// start of bi-directional control flow.
//
Expand Down

0 comments on commit 1fc005d

Please sign in to comment.