Skip to content

Commit

Permalink
batcher: made timer optional + converted to options pattern for argum…
Browse files Browse the repository at this point in the history
…ents
  • Loading branch information
Ice3man543 committed Jun 22, 2023
1 parent 8dd2c29 commit 60f54e5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 20 deletions.
89 changes: 72 additions & 17 deletions batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,58 @@ import (
// FlushCallback is the callback function that will be called when the batcher is full or the flush interval is reached
type FlushCallback[T any] func([]T)

// Batcher is a batcher for any type of data
type Batcher[T any] struct {
maxCapacity int
flushInterval time.Duration

incomingData chan T
full chan bool
mustExit chan bool
done chan bool
flushInterval *time.Duration
flushCallback FlushCallback[T]

incomingData chan T
full chan bool
mustExit chan bool
done chan bool
}

// BatcherOption is the option for the batcher
type BatcherOption[T any] func(*Batcher[T])

// WithMaxCapacity sets the max capacity of the batcher
func WithMaxCapacity[T any](maxCapacity int) BatcherOption[T] {
return func(b *Batcher[T]) {
b.maxCapacity = maxCapacity
}
}

// WithFlushInterval sets the optional flush interval of the batcher
func WithFlushInterval[T any](flushInterval time.Duration) BatcherOption[T] {
return func(b *Batcher[T]) {
b.flushInterval = &flushInterval
}
}

// WithFlushCallback sets the flush callback of the batcher
func WithFlushCallback[T any](fn FlushCallback[T]) BatcherOption[T] {
return func(b *Batcher[T]) {
b.flushCallback = fn
}
}

// New creates a new batcher
func New[T any](maxCapacity int, flushInterval time.Duration, fn FlushCallback[T]) *Batcher[T] {
func New[T any](opts ...BatcherOption[T]) *Batcher[T] {
batcher := &Batcher[T]{
maxCapacity: maxCapacity,
incomingData: make(chan T, maxCapacity),
full: make(chan bool),
flushInterval: flushInterval,
mustExit: make(chan bool, 1),
done: make(chan bool, 1),
flushCallback: fn,
full: make(chan bool),
mustExit: make(chan bool, 1),
done: make(chan bool, 1),
}
for _, opt := range opts {
opt(batcher)
}
batcher.incomingData = make(chan T, batcher.maxCapacity)
if batcher.flushCallback == nil {
panic("batcher: flush callback is required")
}
if batcher.maxCapacity <= 0 {
panic("batcher: max capacity must be greater than 0")
}
return batcher
}
Expand Down Expand Up @@ -61,18 +92,30 @@ func (b *Batcher[T]) run() {
close(b.done)
}()

timer := time.NewTimer(b.flushInterval)
var timer *time.Timer
var flushInterval time.Duration
if b.flushInterval != nil {
flushInterval = *b.flushInterval
timer = time.NewTimer(flushInterval)
b.runWithTimer(timer, flushInterval)
return
}
b.runWithoutTimer()
}

// runWithTimer runs the batcher with timer
func (b *Batcher[T]) runWithTimer(timer *time.Timer, flushInterval time.Duration) {
for {
select {
case <-timer.C:
b.doCallback()
timer.Reset(b.flushInterval)
timer.Reset(flushInterval)
case <-b.full:
if !timer.Stop() {
<-timer.C
}
b.doCallback()
timer.Reset(b.flushInterval)
timer.Reset(flushInterval)
case <-b.mustExit:
if !timer.Stop() {
<-timer.C
Expand All @@ -82,6 +125,18 @@ func (b *Batcher[T]) run() {
}
}

// runWithoutTimer runs the batcher without timer
func (b *Batcher[T]) runWithoutTimer() {
for {
select {
case <-b.full:
b.doCallback()
case <-b.mustExit:
return
}
}
}

func (b *Batcher[T]) doCallback() {
n := len(b.incomingData)
if n == 0 {
Expand Down
9 changes: 6 additions & 3 deletions batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package batcher

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -15,12 +14,16 @@ func TestBatcher(t *testing.T) {
got int
gotBatches int
)
bat := New(batchSize, time.Second, func(t []int) {
callback := func(t []int) {
gotBatches++
for range t {
got++
}
})
}
bat := New[int](
WithMaxCapacity[int](batchSize),
WithFlushCallback[int](callback),
)

bat.Run()

Expand Down

0 comments on commit 60f54e5

Please sign in to comment.