Skip to content

Commit

Permalink
Change Batcher.Start() to return channels
Browse files Browse the repository at this point in the history
This is more useful for client code, and is a more common Go pattern. As
a result the goroutine is launched within start.
  • Loading branch information
otoolep committed May 28, 2015
1 parent 8ecb5a8 commit 4852655
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
52 changes: 30 additions & 22 deletions batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,46 @@ type PointBatcherStats struct {
TimeoutTotal uint64 // Nubmer of timeouts that occurred.
}

// Start starts the batching process. It should be called from a goroutine.
func (b *PointBatcher) Start(in <-chan Point, out chan<- []Point) {
// Start starts the batching process. Returns the in and out channels for points
// batch point-batches respectively.
func (b *PointBatcher) Start() (chan<- Point, <-chan []Point) {
var timer *time.Timer
var batch []Point
var timerCh <-chan time.Time

for {
select {
case p := <-in:
atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil {
batch = make([]Point, 0, b.size)
timer = time.NewTimer(b.duration)
timerCh = timer.C
}
in := make(chan Point)
out := make(chan []Point)

go func() {
for {
select {
case p := <-in:
atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil {
batch = make([]Point, 0, b.size)
timer = time.NewTimer(b.duration)
timerCh = timer.C
}

batch = append(batch, p)
if len(batch) >= b.size { // 0 means send immediately.
atomic.AddUint64(&b.stats.SizeTotal, 1)
out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
timerCh = nil
}

batch = append(batch, p)
if len(batch) >= b.size { // 0 means send immediately.
atomic.AddUint64(&b.stats.SizeTotal, 1)
case <-timerCh:
atomic.AddUint64(&b.stats.TimeoutTotal, 1)
out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
timerCh = nil
}

case <-timerCh:
atomic.AddUint64(&b.stats.TimeoutTotal, 1)
out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
}
}
}()

return in, out
}

// Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be
Expand Down
15 changes: 3 additions & 12 deletions batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ func TestBatch_Size(t *testing.T) {
t.Fatal("failed to create batcher for size test")
}

in := make(chan Point)
out := make(chan []Point)

go batcher.Start(in, out)
in, out := batcher.Start()

var p Point
go func() {
Expand All @@ -39,10 +36,7 @@ func TestBatch_Timeout(t *testing.T) {
t.Fatal("failed to create batcher for timeout test")
}

in := make(chan Point)
out := make(chan []Point)

go batcher.Start(in, out)
in, out := batcher.Start()

var p Point
go func() {
Expand All @@ -65,10 +59,7 @@ func TestBatch_MultipleBatches(t *testing.T) {
t.Fatal("failed to create batcher for size test")
}

in := make(chan Point)
out := make(chan []Point)

go batcher.Start(in, out)
in, out := batcher.Start()

var p Point
var b []Point
Expand Down

0 comments on commit 4852655

Please sign in to comment.