Skip to content
This repository has been archived by the owner on Jul 29, 2021. It is now read-only.

Commit

Permalink
Merge pull request #14 from plasne/make-watcher-done-implicit
Browse files Browse the repository at this point in the history
watcher done() is now implicit
  • Loading branch information
plasne authored Feb 25, 2021
2 parents 5d356a2 + d12e1d6 commit 5920ef3
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 281 deletions.
15 changes: 6 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ func main() {
http.HandleFunc("/ingest", func(res http.ResponseWriter, req *http.Request) {

// create a batch watcher
watcher := gobatcher.NewWatcher(func(batch []*gobatcher.Operation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
// process the batch
done()
}).WithMaxAttempts(3)

// enqueue operations
Expand Down Expand Up @@ -196,7 +195,7 @@ batcher := gobatcher.NewBatcherWithBuffer(buffer).

- __AuditInterval__ [DEFAULT: 10s]: This determines how often the Target is audited to ensure it is accurate. The Target is manipulated with atomic Operations and abandoned batches are cleaned up after MaxOperationTime so Target should always be accurate. Therefore, we should expect to only see "audit-pass" and "audit-skip" events. This audit interval is a failsafe that if the buffer is empty and the MaxOperationTime (on Batcher only; Watchers are ignored) is exceeded and the Target is greater than zero, it is reset and an "audit-fail" event is raised. Since Batcher is a long-lived process, this audit helps ensure a broken process does not monopolize SharedCapacity when it isn't needed.

- __MaxOperationTime__ [DEFAULT: 1m]: This determines how long the system should wait for the done() function to be called on the batch before it assumes it is done and decreases the Target anyway. It is critical that the Target reflect the current cost of outstanding Operations. The MaxOperationTime ensures that a batch isn't orphaned and continues reserving capacity long after it is no longer needed. Please note there is also a MaxOperationTime on the Watcher which takes precident over this time.
- __MaxOperationTime__ [DEFAULT: 1m]: This determines how long the system should wait for the Watcher's callback function to be completed before it assumes it is done and decreases the Target anyway. It is critical that the Target reflect the current cost of outstanding Operations. The MaxOperationTime ensures that a batch isn't orphaned and continues reserving capacity long after it is no longer needed. Please note there is also a MaxOperationTime on the Watcher which takes precident over this time.

- __PauseTime__ [DEFAULT: 500ms]: This determines how long the FlushInterval, CapacityInterval, and AuditIntervals are paused when Batcher.Pause() is called. Typically you would pause because the datastore cannot keep up with the volume of requests (if it happens maybe adjust your rate limiter).

Expand Down Expand Up @@ -237,29 +236,27 @@ operation := gobatcher.NewOperation(&watcher, cost, payload).WithBatching(true)
Creating a new Watcher with all defaults might look like this...

```go
watcher := gobatcher.NewWatcher(func(batch []*gobatcher.Operation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
// your processing function goes here
done() // marks all operations in the batch as done; reduces target
})
```

Creating with all available configuration options might look like this...

```go
watcher := gobatcher.NewWatcher(func(batch []*gobatcher.Operation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
// your processing function goes here
done() // marks all operations in the batch as done; reduces target
}).
WithMaxAttempts(3).
WithMaxBatchSize(500).
WithMaxOperationTime(1 * time.Minute)
```

- __processing_func__ [REQUIRED]: To create a new Watcher, you must provide a function that accepts a batch of Operations and a done function. The provided function will be called as each batch is available for processing. As soon as you are done processing, you should call the provided done function - this will reduce the Target by the cost of all Operations in the batch. If for some reason you don't call it, they Target will be reduced after MaxOperationTime. Every time this function is called with a batch it is run as a new goroutine so anything inside could cause race conditions with the rest of your code - use atomic, sync, etc. as appropriate.
- __processing_func__ [REQUIRED]: To create a new Watcher, you must provide a callback function that accepts a batch of Operations. The provided function will be called as each batch is available for processing. When the callback function is completed, it will reduce the Target by the cost of all Operations in the batch. If for some reason the processing is "stuck" in this function, they Target will be reduced after MaxOperationTime. Every time this function is called with a batch it is run as a new goroutine so anything inside could cause race conditions with the rest of your code - use atomic, sync, etc. as appropriate.

- __MaxAttempts__ [OPTIONAL]: If there are transient errors, you can enqueue the same Operation again. If you do not provide MaxAttempts, it will allow you to enqueue as many times as you like. Instead, if you specify MaxAttempts, the Enqueue() method will return `TooManyAttemptsError{}` if you attempt to enqueue it too many times. You could check this yourself instead of just enqueuing, but this provides a simple pattern of always attempt to enqueue then handle errors.

- __MaxOperationTime__ [OPTIONAL]: This determines how long the system should wait for the done() function to be called on the batch before it assumes it is done and decreases the Target anyway. It is critical that the Target reflect the current cost of outstanding Operations. The MaxOperationTime ensures that a batch isn't orphaned and continues reserving capacity long after it is no longer needed. If MaxOperationTime is not provided on the Watcher, the Batcher MaxOperationTime is used.
- __MaxOperationTime__ [OPTIONAL]: This determines how long the system should wait for the callback function to be completed on the batch before it assumes it is done and decreases the Target anyway. It is critical that the Target reflect the current cost of outstanding Operations. The MaxOperationTime ensures that a batch isn't orphaned and continues reserving capacity long after it is no longer needed. If MaxOperationTime is not provided on the Watcher, the Batcher MaxOperationTime is used.

Creating a new ProvisionedResource might look like this...

Expand Down
2 changes: 0 additions & 2 deletions azure-blob-lease-manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package batcher

// NOTE: please review this code which addresses using blob leases to share capacity

import (
"bytes"
"context"
Expand Down
2 changes: 0 additions & 2 deletions azure-shared-resource_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package batcher_test

// NOTE: please review unit-tests

import (
"context"
"fmt"
Expand Down
11 changes: 7 additions & 4 deletions batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,14 @@ func (r *Batcher) Start() (err error) {
op.MakeAttempt()
}

// NOTE: done() is called by the user or happens after maxOperationTime
// process the batch
waitForDone := make(chan struct{})
watcher.ProcessBatch(batch, func() {
close(waitForDone)
})
go func() {
defer close(waitForDone)
watcher.ProcessBatch(batch)
}()

// the batch is "done" when the ProcessBatch func() finishes or the maxOperationTime is exceeded
maxOperationTime := r.maxOperationTime
if watcher.MaxOperationTime() > 0 {
maxOperationTime = watcher.MaxOperationTime()
Expand Down
103 changes: 38 additions & 65 deletions batcher_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package batcher_test

// NOTE: please review unit-tests

import (
"fmt"
"sync"
Expand All @@ -17,9 +15,7 @@ func TestEnqueue(t *testing.T) {

t.Run("enqueue is allowed before startup", func(t *testing.T) {
batcher := gobatcher.NewBatcher()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
operation := gobatcher.NewOperation(watcher, 0, struct{}{}, false)
err := batcher.Enqueue(operation)
assert.NoError(t, err, "expect enqueue to be fine even if not started")
Expand Down Expand Up @@ -54,9 +50,7 @@ func TestEnqueue(t *testing.T) {
WithRateLimiter(res)
err := batcher.Start()
assert.NoError(t, err, "expecting no errors on startup")
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
operation := gobatcher.NewOperation(watcher, 2000, struct{}{}, false)
err = batcher.Enqueue(operation)
if err != nil {
Expand All @@ -72,9 +66,7 @@ func TestEnqueue(t *testing.T) {
WithRateLimiter(res)
err := batcher.Start()
assert.NoError(t, err, "expecting no errors on startup")
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
good := gobatcher.NewOperation(watcher, 11000, struct{}{}, false)
err = batcher.Enqueue(good)
assert.NoError(t, err)
Expand Down Expand Up @@ -111,9 +103,8 @@ func TestEnqueue(t *testing.T) {
assert.FailNow(t, "the max-attempts governor didn't work, we have tried too many times")
}
}
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
enqueue()
done()
}).WithMaxAttempts(3)
op = gobatcher.NewOperation(watcher, 100, struct{}{}, false)
enqueue()
Expand All @@ -134,8 +125,7 @@ func TestEnqueue(t *testing.T) {
var attempts uint32
func() {
count := 0
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
defer done()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
updateCountersMutex.Lock()
defer updateCountersMutex.Unlock()
for _, entry := range batch {
Expand Down Expand Up @@ -171,9 +161,7 @@ func TestEnqueue(t *testing.T) {

t.Run("enqueue will block caller if buffer full (default)", func(t *testing.T) {
batcher := gobatcher.NewBatcherWithBuffer(1)
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
var err error
op1 := gobatcher.NewOperation(watcher, 0, struct{}{}, false)
err = batcher.Enqueue(op1)
Expand All @@ -198,9 +186,7 @@ func TestEnqueue(t *testing.T) {
t.Run("enqueue will throw error if buffer is full (config)", func(t *testing.T) {
batcher := gobatcher.NewBatcherWithBuffer(1).
WithErrorOnFullBuffer()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
var err error
op1 := gobatcher.NewOperation(watcher, 0, struct{}{}, false)
err = batcher.Enqueue(op1)
Expand All @@ -219,9 +205,7 @@ func TestOperationsInBuffer(t *testing.T) {

t.Run("enqueuing operations increases num in buffer", func(t *testing.T) {
batcher := gobatcher.NewBatcher()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
err := batcher.Enqueue(op)
assert.NoError(t, err, "expecting no error on enqueue")
Expand All @@ -236,11 +220,10 @@ func TestOperationsInBuffer(t *testing.T) {
batcher := gobatcher.NewBatcher()
wg := sync.WaitGroup{}
wg.Add(4)
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
for i := 0; i < len(batch); i++ {
wg.Done()
}
done()
})
for i := 0; i < 4; i++ {
op := gobatcher.NewOperation(watcher, 100, struct{}{}, batching)
Expand All @@ -263,9 +246,7 @@ func TestNeedsCapacity(t *testing.T) {

t.Run("cost updates the target", func(t *testing.T) {
batcher := gobatcher.NewBatcher()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
err := batcher.Enqueue(op)
assert.NoError(t, err, "expecting no error on enqueue")
Expand All @@ -280,8 +261,7 @@ func TestNeedsCapacity(t *testing.T) {
batcher := gobatcher.NewBatcher()
wg := sync.WaitGroup{}
wg.Add(4)
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
for i := 0; i < len(batch); i++ {
wg.Done()
}
Expand Down Expand Up @@ -319,8 +299,8 @@ func TestNeedsCapacity(t *testing.T) {
}
}
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// do not mark as done()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
time.Sleep(400 * time.Millisecond)
})
var err error
op1 := gobatcher.NewOperation(watcher, 800, struct{}{}, false)
Expand Down Expand Up @@ -356,8 +336,8 @@ func TestNeedsCapacity(t *testing.T) {
}
}
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// do not mark as done()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
time.Sleep(400 * time.Millisecond)
})
var err error
op1 := gobatcher.NewOperation(watcher, 800, struct{}{}, false)
Expand Down Expand Up @@ -510,9 +490,8 @@ func TestBatcherPause(t *testing.T) {
wg.Done()
}
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
assert.True(t, resumed, "all batches should be raised after resume")
done()
wg.Done()
})
batcher.Pause()
Expand Down Expand Up @@ -578,7 +557,7 @@ func TestBatcherStart(t *testing.T) {
wg.Add(2)
var op1, op2, op3 gobatcher.IOperation
var count uint32 = 0
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
atomic.AddUint32(&count, uint32(len(batch)))
switch len(batch) {
case 1:
Expand All @@ -587,7 +566,6 @@ func TestBatcherStart(t *testing.T) {
assert.Equal(t, op1, batch[0], "expect that the batch has op1 and op3")
assert.Equal(t, op3, batch[1], "expect that the batch has op1 and op3")
}
done()
wg.Done()
})
op1 = gobatcher.NewOperation(watcher, 100, struct{}{}, true)
Expand All @@ -609,10 +587,9 @@ func TestBatcherStart(t *testing.T) {
batcher := gobatcher.NewBatcher().
WithFlushInterval(1 * time.Millisecond)
var count uint32 = 0
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
atomic.AddUint32(&count, 1)
assert.Equal(t, 3, len(batch), "expect batches to have 3 operations each")
done()
}).WithMaxBatchSize(3)
for i := 0; i < 9; i++ {
op := gobatcher.NewOperation(watcher, 100, struct{}{}, true)
Expand Down Expand Up @@ -722,9 +699,8 @@ func TestTimers(t *testing.T) {
WithRateLimiter(res).
WithFlushInterval(d.interval)
var count uint32 = 0
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
atomic.AddUint32(&count, uint32(len(batch)))
done()
})
for i := 0; i < d.enqueue; i++ {
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
Expand Down Expand Up @@ -757,9 +733,7 @@ func TestTimers(t *testing.T) {
atomic.AddUint32(&count, 1)
}
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
op := gobatcher.NewOperation(watcher, 800, struct{}{}, false)
err := batcher.Enqueue(op)
assert.NoError(t, err, "not expecting an enqueue error")
Expand All @@ -770,11 +744,12 @@ func TestTimers(t *testing.T) {
})
}

t.Run("ensure abandoned operations are still marked done (watcher)", func(t *testing.T) {
t.Run("ensure long-running operations are still marked done (watcher)", func(t *testing.T) {
batcher := gobatcher.NewBatcher().
WithFlushInterval(1 * time.Millisecond)
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// don't mark as done
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
// NOTE: simulate a long-running operation
time.Sleep(100 * time.Millisecond)
}).WithMaxOperationTime(10 * time.Millisecond)
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
err := batcher.Enqueue(op)
Expand All @@ -788,12 +763,13 @@ func TestTimers(t *testing.T) {
assert.Equal(t, uint32(0), after, "expecting 0 capacity request after max-operation-time")
})

t.Run("ensure abandoned operations are still marked done (batcher)", func(t *testing.T) {
t.Run("ensure long-running operations are still marked done (batcher)", func(t *testing.T) {
batcher := gobatcher.NewBatcher().
WithFlushInterval(1 * time.Millisecond).
WithMaxOperationTime(10 * time.Millisecond)
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// don't mark as done
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
// NOTE: simulate a long-running operation
time.Sleep(100 * time.Millisecond)
})
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
err := batcher.Enqueue(op)
Expand All @@ -807,11 +783,12 @@ func TestTimers(t *testing.T) {
assert.Equal(t, uint32(0), after, "expecting 0 capacity request after max-operation-time")
})

t.Run("ensure abandoned operations are not marked done before 1 min", func(t *testing.T) {
t.Run("ensure abandoned operations are not marked done before 1m default", func(t *testing.T) {
batcher := gobatcher.NewBatcher().
WithFlushInterval(1 * time.Millisecond)
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// don't mark as done
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
// NOTE: simulate a long-running operation
time.Sleep(400 * time.Millisecond)
})
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
err := batcher.Enqueue(op)
Expand Down Expand Up @@ -843,10 +820,7 @@ func TestAudit(t *testing.T) {
atomic.AddUint32(&failed, 1)
}
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// NOTE: using a max-op-time of 1ms removes the targets whether done is called or not
done()
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {})
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
err := batcher.Enqueue(op)
assert.NoError(t, err, "not expecting an enqueue error")
Expand All @@ -870,8 +844,8 @@ func TestAudit(t *testing.T) {
atomic.AddUint32(&failed, 1)
}
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// don't mark as done
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
time.Sleep(20 * time.Millisecond)
}).WithMaxOperationTime(1 * time.Minute)
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
err := batcher.Enqueue(op)
Expand All @@ -893,8 +867,8 @@ func TestAudit(t *testing.T) {
atomic.AddUint32(&skipped, 1)
}
})
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
// don't mark as done
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
time.Sleep(20 * time.Millisecond)
})
var err error
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
Expand All @@ -914,8 +888,7 @@ func TestManualFlush(t *testing.T) {
err = batcher.Start()
assert.NoError(t, err, "not expecting a start error")
completed := make(chan bool, 1)
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation, done func()) {
done()
watcher := gobatcher.NewWatcher(func(batch []gobatcher.IOperation) {
completed <- true
})
op := gobatcher.NewOperation(watcher, 100, struct{}{}, false)
Expand Down
Loading

0 comments on commit 5920ef3

Please sign in to comment.