Skip to content

Commit

Permalink
chore: introduce in-memory stats for testing (#2735)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored Nov 28, 2022
1 parent 0ed5a82 commit 6ac8c31
Show file tree
Hide file tree
Showing 2 changed files with 374 additions and 0 deletions.
231 changes: 231 additions & 0 deletions services/stats/memstats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package memstats

import (
"context"
"sync"
"time"

"github.com/rudderlabs/rudder-server/services/stats"
)

var _ stats.Stats = (*Store)(nil)

var _ stats.Measurement = (*Measurement)(nil)

type Store struct {
mu sync.Mutex
byKey map[string]*Measurement
now func() time.Time
}

type Measurement struct {
mu sync.Mutex
startTime time.Time
now func() time.Time

tags stats.Tags
name string
mType string

sum float64
values []float64
durations []time.Duration
}

func (m *Measurement) LastValue() float64 {
m.mu.Lock()
defer m.mu.Unlock()

if len(m.values) == 0 {
return 0
}

return m.values[len(m.values)-1]
}

func (m *Measurement) Values() []float64 {
m.mu.Lock()
defer m.mu.Unlock()

s := make([]float64, len(m.values))
copy(s, m.values)

return s
}

func (m *Measurement) LastDuration() time.Duration {
m.mu.Lock()
defer m.mu.Unlock()

if len(m.durations) == 0 {
return 0
}

return m.durations[len(m.durations)-1]
}

func (m *Measurement) Durations() []time.Duration {
m.mu.Lock()
defer m.mu.Unlock()

s := make([]time.Duration, len(m.durations))
copy(s, m.durations)

return s
}

// Count implements stats.Measurement
func (m *Measurement) Count(n int) {
m.mu.Lock()
defer m.mu.Unlock()

if m.mType != stats.CountType {
panic("operation Count not supported for measurement type:" + m.mType)
}

m.sum += float64(n)
m.values = append(m.values, m.sum)
}

// Increment implements stats.Measurement
func (m *Measurement) Increment() {
if m.mType != stats.CountType {
panic("operation Increment not supported for measurement type:" + m.mType)
}

m.Count(1)
}

// Gauge implements stats.Measurement
func (m *Measurement) Gauge(value interface{}) {
if m.mType != stats.GaugeType {
panic("operation Gauge not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.values = append(m.values, value.(float64))
}

// Observe implements stats.Measurement
func (m *Measurement) Observe(value float64) {
if m.mType != stats.HistogramType {
panic("operation Observe not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.values = append(m.values, value)
}

// Start implements stats.Measurement
func (m *Measurement) Start() {
if m.mType != stats.TimerType {
panic("operation Start not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.startTime = m.now()
}

// End implements stats.Measurement
func (m *Measurement) End() {
if m.mType != stats.TimerType {
panic("operation End not supported for measurement type:" + m.mType)
}

m.SendTiming(m.now().Sub(m.startTime))
}

// Since implements stats.Measurement
func (m *Measurement) Since(start time.Time) {
if m.mType != stats.TimerType {
panic("operation Since not supported for measurement type:" + m.mType)
}

m.SendTiming(m.now().Sub(start))
}

// SendTiming implements stats.Measurement
func (m *Measurement) SendTiming(duration time.Duration) {
if m.mType != stats.TimerType {
panic("operation SendTiming not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.durations = append(m.durations, duration)
}

type Opts func(*Store)

func WithNow(nowFn func() time.Time) Opts {
return func(s *Store) {
s.now = nowFn
}
}

func New(opts ...Opts) *Store {
s := &Store{
byKey: make(map[string]*Measurement),
now: time.Now,
}

for _, opt := range opts {
opt(s)
}

return s
}

// NewStat implements stats.Stats
func (ms *Store) NewStat(name, statType string) (m stats.Measurement) {
return ms.NewTaggedStat(name, statType, nil)
}

// NewTaggedStat implements stats.Stats
func (ms *Store) NewTaggedStat(name, statType string, tags stats.Tags) stats.Measurement {
return ms.NewSampledTaggedStat(name, statType, tags)
}

// NewSampledTaggedStat implements stats.Stats
func (ms *Store) NewSampledTaggedStat(name, statType string, tags stats.Tags) stats.Measurement {
ms.mu.Lock()
defer ms.mu.Unlock()

m := &Measurement{
name: name,
tags: tags,
mType: statType,

now: ms.now,
}

ms.byKey[ms.getKey(name, tags)] = m
return m
}

// Get the stored measurement with the name and tags.
// If no measurement is found, nil is returned.
func (ms *Store) Get(name string, tags stats.Tags) *Measurement {
ms.mu.Lock()
defer ms.mu.Unlock()

return ms.byKey[ms.getKey(name, tags)]
}

// Start implements stats.Stats
func (*Store) Start(_ context.Context) {}

// Stop implements stats.Stats
func (*Store) Stop() {}

// getKey maps name and tags, to a store lookup key.
func (*Store) getKey(name string, tags stats.Tags) string {
return name + tags.String()
}
143 changes: 143 additions & 0 deletions services/stats/memstats/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package memstats_test

import (
"context"
"testing"
"time"

"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/stats/memstats"
"github.com/stretchr/testify/require"
)

func TestStats(t *testing.T) {
now := time.Now()

store := memstats.New(
memstats.WithNow(func() time.Time {
return now
}),
)

commonTags := stats.Tags{"tag1": "value1"}

t.Run("test Count", func(t *testing.T) {
name := "testCount"

m := store.NewTaggedStat(name, stats.CountType, commonTags)

m.Increment()

require.Equal(t, 1.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values())

m.Count(2)

require.Equal(t, 3.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0, 3.0}, store.Get(name, commonTags).Values())
})

t.Run("test Gauge", func(t *testing.T) {
name := "testGauge"
m := store.NewTaggedStat(name, stats.GaugeType, commonTags)

m.Gauge(1.0)

require.Equal(t, 1.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values())

m.Gauge(2.0)

require.Equal(t, 2.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values())
})

t.Run("test Histogram", func(t *testing.T) {
name := "testHistogram"
m := store.NewTaggedStat(name, stats.HistogramType, commonTags)

m.Observe(1.0)

require.Equal(t, 1.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values())

m.Observe(2.0)

require.Equal(t, 2.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values())
})

t.Run("test Timer", func(t *testing.T) {
name := "testTimer"

m := store.NewTaggedStat(name, stats.TimerType, commonTags)

m.SendTiming(time.Second)
require.Equal(t, time.Second, store.Get(name, commonTags).LastDuration())
require.Equal(t, []time.Duration{time.Second}, store.Get(name, commonTags).Durations())

m.SendTiming(time.Minute)
require.Equal(t, time.Minute, store.Get(name, commonTags).LastDuration())
require.Equal(t,
[]time.Duration{time.Second, time.Minute},
store.Get(name, commonTags).Durations(),
)

m.Start()
now = now.Add(time.Hour)
m.End()
require.Equal(t, time.Hour, store.Get(name, commonTags).LastDuration())
require.Equal(t,
[]time.Duration{time.Second, time.Minute, time.Hour},
store.Get(name, commonTags).Durations(),
)

m.Since(now.Add(-time.Minute))
require.Equal(t, time.Minute, store.Get(name, commonTags).LastDuration())
require.Equal(t,
[]time.Duration{time.Second, time.Minute, time.Hour, time.Minute},
store.Get(name, commonTags).Durations(),
)
})

t.Run("invalid operations", func(t *testing.T) {
require.PanicsWithValue(t, "operation Count not supported for measurement type:gauge", func() {
store.NewTaggedStat("invalid_count", stats.GaugeType, commonTags).Count(1)
})
require.PanicsWithValue(t, "operation Increment not supported for measurement type:gauge", func() {
store.NewTaggedStat("invalid_inc", stats.GaugeType, commonTags).Increment()
})
require.PanicsWithValue(t, "operation Gauge not supported for measurement type:count", func() {
store.NewTaggedStat("invalid_gauge", stats.CountType, commonTags).Gauge(1)
})
require.PanicsWithValue(t, "operation SendTiming not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_send_timing", stats.HistogramType, commonTags).SendTiming(time.Second)
})
require.PanicsWithValue(t, "operation Start not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_start", stats.HistogramType, commonTags).Start()
})
require.PanicsWithValue(t, "operation End not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_end", stats.HistogramType, commonTags).End()
})
require.PanicsWithValue(t, "operation Since not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_since", stats.HistogramType, commonTags).Since(time.Now())
})
require.PanicsWithValue(t, "operation Observe not supported for measurement type:timer", func() {
store.NewTaggedStat("invalid_observe", stats.TimerType, commonTags).Observe(1)
})
})

t.Run("no op", func(t *testing.T) {
store.Start(context.Background())
store.Stop()
})

t.Run("no tags", func(t *testing.T) {
name := "no_tags"
m := store.NewStat(name, stats.CountType)

m.Increment()

require.Equal(t, 1.0, store.Get(name, nil).LastValue())
})
}

0 comments on commit 6ac8c31

Please sign in to comment.