Skip to content

Commit

Permalink
fixup! output: concurrent periodic flusher
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Oct 19, 2021
1 parent a173d72 commit 6df9b57
Showing 1 changed file with 11 additions and 15 deletions.
26 changes: 11 additions & 15 deletions output/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ type PeriodicFlusher struct {
stop chan struct{}
stopped chan struct{}
once *sync.Once
run func()
}

// Stop waits for the periodic flusher flush one last time and exit. You can
Expand All @@ -105,7 +104,7 @@ func NewPeriodicFlusher(period time.Duration, flushCallback func()) (*PeriodicFl
once: &sync.Once{},
}

pf.run = func() {
go func() {
ticker := time.NewTicker(pf.period)
for {
select {
Expand All @@ -118,22 +117,19 @@ func NewPeriodicFlusher(period time.Duration, flushCallback func()) (*PeriodicFl
return
}
}
}

go pf.run()
}()
return pf, nil
}

// NewAsyncPeriodicFlusher creates a new PeriodicFlusher and starts
// many goroutines equal to the expected provided limit.
// Concurrency is overwritten in case the value is less than one.
// many goroutines equal to the expected provided concurrency limit.
func NewAsyncPeriodicFlusher(period time.Duration, concurrency int, flushCallback func()) (*PeriodicFlusher, error) {
if period <= 0 {
return nil, fmt.Errorf("metric flush period should be positive but was %s", period)
}

if concurrency < 1 {
concurrency = 1
return nil, fmt.Errorf("concurrency must be positive but was %d", concurrency)
}

pf := &PeriodicFlusher{
Expand All @@ -144,10 +140,12 @@ func NewAsyncPeriodicFlusher(period time.Duration, concurrency int, flushCallbac
once: &sync.Once{},
}

pf.run = func() {
ticker := time.NewTicker(pf.period)
wg := sync.WaitGroup{}
limiter := make(chan struct{}, concurrency)
go func() {
var (
ticker = time.NewTicker(pf.period)
limiter = make(chan struct{}, concurrency)
wg = sync.WaitGroup{}
)
for {
select {
case <-ticker.C:
Expand All @@ -168,8 +166,6 @@ func NewAsyncPeriodicFlusher(period time.Duration, concurrency int, flushCallbac
return
}
}
}

go pf.run()
}()
return pf, nil
}

0 comments on commit 6df9b57

Please sign in to comment.