Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add StartContext, remove Stop #147

Merged
merged 1 commit into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 18 additions & 63 deletions stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stats

import (
"context"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -40,18 +41,16 @@ type Store interface {
// and flush all the Counters and Gauges registered with it.
Flush()

// Start a timer for periodic stat flushes.
//
// If Start is called multiple times, the previous ticker is
// stopped and a new replacement ticker is started. It is
// equivalent to calling Stop() and then Start() with the new
// ticker.
// Start a timer for periodic stat flushes. This is a blocking
// call and should be called in a goroutine.
Start(*time.Ticker)

// Stop stops the running periodic flush timer.
// StartContext starts a timer for periodic stat flushes. This is
// a blocking call and should be called in a goroutine.
//
// If no periodic flush is currently active, this is a no-op.
Stop()
// If the passed-in context is cancelled, then this call
// exits. Flush will be called on exit.
StartContext(context.Context, *time.Ticker)

// Add a StatGenerator to the Store that programatically generates stats.
AddStatGenerator(StatGenerator)
Expand Down Expand Up @@ -343,68 +342,24 @@ type statStore struct {

mu sync.RWMutex
statGenerators []StatGenerator
stop chan bool
wg *sync.WaitGroup

sink Sink
}

func (s *statStore) Start(ticker *time.Ticker) {
s.mu.Lock()
defer s.mu.Unlock()

// if there is already a stop channel allocated, that means there
// is a ticker running - we will replace the ticker now.
if s.stop != nil {
s.stopLocked()
}

stopChan := make(chan bool, 1)
wg := &sync.WaitGroup{}
wg.Add(1)

s.stop = stopChan
s.wg = wg

go func() {
defer wg.Done()
for {
select {
case <-ticker.C:
s.Flush()
case <-stopChan:
return
}
func (s *statStore) StartContext(ctx context.Context, ticker *time.Ticker) {
for {
select {
case <-ctx.Done():
s.Flush()
return
tomwans marked this conversation as resolved.
Show resolved Hide resolved
case <-ticker.C:
s.Flush()
}
}()
}

// stopLocked is the core of the stop implementation, but without a
// lock.
func (s *statStore) stopLocked() {
// if the stop channel is nil, there is no ticker running, so this
// is a no-op.
if s.stop == nil {
return
}

// close to make the flush goroutine stop
close(s.stop)

// wait for the flush goroutine to fully stop
s.wg.Wait()

// nil out the stop channel
s.stop = nil

// nil out the wait group for tidyness
s.wg = nil
}

func (s *statStore) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
s.stopLocked()
func (s *statStore) Start(ticker *time.Ticker) {
s.StartContext(context.Background(), ticker)
}

func (s *statStore) Flush() {
Expand Down
74 changes: 17 additions & 57 deletions stats_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stats

import (
"context"
crand "crypto/rand"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -43,70 +44,29 @@ func TestStats(t *testing.T) {
wg.Wait()
}

// TestStatsStartMultipleTimes ensures starting a periodic flush twice
// works as expected.
func TestStatsStartMultipleTimes(t *testing.T) {
// TestStatsStartContext ensures that a cancelled context cancels a
// flushing goroutine.
func TestStatsStartContext(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)

// we check to make sure the two stop channels are different,
// that's the best we can do.
store.Start(time.NewTicker(1 * time.Minute))
ctx, cancel := context.WithCancel(context.Background())
tick := time.NewTicker(1 * time.Minute)

// grab the stop channel
realStore := store.(*statStore)
stopChan1 := realStore.stop
wg := &sync.WaitGroup{}

store.Start(time.NewTicker(10 * time.Hour))

// grab the new stop channel
stopChan2 := realStore.stop

if stopChan1 == stopChan2 {
t.Error("two stop channels are the same")
}
}

// TestStatsStartStop ensures starting a periodic flush can be
// stopped.
func TestStatsStartStop(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)

store.Start(time.NewTicker(1 * time.Minute))
store.Stop()

realStore := store.(*statStore)

// we check to make sure the two stop channel is nil'ed out, that
// is the best we can do to avoid flakey time based tests.
if realStore.stop != nil {
t.Errorf("expected stop channel to be nil")
}
}

// TestStatsStartStopMultipleTimes ensures starting a periodic flush
// can be stopped, even if called multiple times.
func TestStatsStartStopMultipleTimes(t *testing.T) {
sink := &testStatSink{}
store := NewStore(sink, true)

// ensure we can call it if no ticker was ever started
store.Stop()

// start one, and stop many times.
store.Start(time.NewTicker(1 * time.Minute))
store.Stop()
store.Stop()
store.Stop()
wg.Add(1)
go func() {
defer wg.Done()
store.StartContext(ctx, tick)
}()

realStore := store.(*statStore)
// now we cancel, and its ok to do this at any point - the
// goroutine above could have started or not started, either case
// is ok.
cancel()

// we check to make sure the two stop channel is nil'ed out, that
// is the best we can do to avoid flakey time based tests.
if realStore.stop != nil {
t.Errorf("expected stop channel to be nil")
}
wg.Wait()
}

// Ensure timers and timespans are working
Expand Down