Skip to content

Commit

Permalink
Fix not working slack.
Browse files Browse the repository at this point in the history
As reported in #23 and #27 slack is currently broken in the atomic
implementation.

Both #23 and #27 propose valid solutions, and I actually arrived
at #23 when trying to solve this independently, but I find #27
slightly more readable/understandable, so going with that.

Also, #23 seems to expose another bug in andres-erbsen/clock, with
sleeping with negative time, so we should probably consider switching
to benbjohnson/clock - maybe in a follow up.
  • Loading branch information
rabbbit committed Jan 3, 2021
1 parent 4fc173c commit ae17f54
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
15 changes: 11 additions & 4 deletions limiter_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,21 @@ func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
newState := state{}
taken := false
var (
newState state
taken bool
interval time.Duration
)
for !taken {
now := t.clock.Now()

previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)

newState = state{}
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
newState.last = now

// If this is our first request, then we allow it.
Expand All @@ -93,9 +99,10 @@ func (t *atomicLimiter) Take() time.Time {
}
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(newState.sleepFor)
t.clock.Sleep(interval)
return newState.last
}
29 changes: 26 additions & 3 deletions ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,20 @@ func TestDelayedRateLimiter(t *testing.T) {
slow := r.createLimiter(10, ratelimit.WithoutSlack)
fast := r.createLimiter(100, ratelimit.WithoutSlack)

// Run a slow startTaking
r.startTaking(slow, fast)

// Accumulate slack for 10 seconds,
r.afterFunc(20*time.Second, func() {
// Then start working.
r.startTaking(fast)
r.startTaking(fast)
r.startTaking(fast)
r.startTaking(fast)
})

// Slow limiter allows 10 per second, so 100.
r.assertCountAt(10*time.Second, 100)
// Another 10 seconds, so we're at 200.
r.assertCountAt(20*time.Second, 200)
// Now the fast limiter goes at 100/sec, so another 1000.
r.assertCountAt(30*time.Second, 1200)
})
}
Expand All @@ -192,3 +194,24 @@ func TestPer(t *testing.T) {
r.assertCountAt(2*time.Minute, 15)
})
}

func TestSlack(t *testing.T) {
runTest(t, func(r runner) {
slow := r.createLimiter(10, ratelimit.WithoutSlack)
// Defaults to 10 slack.
fast := r.createLimiter(100)

r.startTaking(slow, fast)

r.afterFunc(1*time.Second, func() {
r.startTaking(fast)
r.startTaking(fast)
})

// limiter with 10hz dominates here - we're just at 10.
r.assertCountAt(1*time.Second, 10)
// limiter with 100hz dominates, so we're at 110,
// but we get extra 10 from accumulated slack
r.assertCountAt(2*time.Second, 120)
})
}

0 comments on commit ae17f54

Please sign in to comment.