Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit ab9d95d

Browse files
committed
simplify TimeLimiter, fixing its tests
* previous implementation was a bit hastily implemented and much more complicated than needed: trying to support concurrency but not doing it well, while we don't actually need any of it. * unit testing was very flakey, in particular in circleCI there's lots of timing inaccuracy leading to broken tests * it was inaccurate, for shortlived operations it would be negligible but for longer operations it is not. While our use case operations should be short, it is nice to be able to fix this while we're at it. So now we simplify the design a lot, basically reduce it to a non-concurrent accounting structure, removing all channels, goroutines and context, making the accounting more correct and the testing more robust.
1 parent 58e3ba8 commit ab9d95d

File tree

3 files changed

+103
-126
lines changed

3 files changed

+103
-126
lines changed

idx/memory/memory.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package memory
22

33
import (
4-
"context"
54
"flag"
65
"fmt"
76
"regexp"
@@ -1316,12 +1315,9 @@ DEFS:
13161315
}
13171316
m.RUnlock()
13181317

1319-
ctx, cancel := context.WithCancel(context.Background())
1320-
defer cancel()
1321-
13221318
// create a new timeLimiter that allows us to limit the amount of time we spend
13231319
// holding a lock to maxPruneLockTime (default 100ms) every second.
1324-
tl := NewTimeLimiter(ctx, time.Second, maxPruneLockTime)
1320+
tl := NewTimeLimiter(time.Second, maxPruneLockTime, time.Now())
13251321

13261322
for org, ids := range toPruneTagged {
13271323
if len(ids) == 0 {

idx/memory/time_limit.go

+76-69
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,101 @@
11
package memory
22

33
import (
4-
"context"
5-
"sync"
4+
"fmt"
65
"time"
6+
7+
"github.com/davecgh/go-spew/spew"
78
)
89

9-
// TimeLimiter limits the rate of a set of operations.
10-
// It does this by slowing down further operations as soon
10+
// TimeLimiter limits the rate of a set of serial operations.
11+
// It does this by tracking how much time has been spent (updated via Add()),
12+
// and comparing this to the window size and the limit, slowing down further operations as soon
1113
// as one Add() is called informing it the per-window allowed budget has been exceeded.
1214
// Limitations:
13-
// * concurrently running operations can all exceed the budget,
14-
// so it works best for serial operations.
15-
// * for serial operations, the last operation is allowed to exceed the budget
16-
// * when an operation takes very long (e.g. 10 seconds, with a 100ms limit per second), it
17-
// is counted as exceeding the 100ms budget, but no other provisions are being made.
15+
// * the last operation is allowed to exceed the budget (but the next call will be delayed to compensate)
16+
// * concurrency is not supported
1817
//
19-
// Thus, TimeLimiter is designed for, and works best with, serially running operations,
20-
// each of which takes a fraction of the limit.
18+
// For correctness, you should always follow up an Add() with a Wait()
2119
type TimeLimiter struct {
22-
sync.Mutex
23-
ctx context.Context
20+
since time.Time
21+
next time.Time
2422
timeSpent time.Duration
2523
window time.Duration
2624
limit time.Duration
27-
addCh chan time.Duration
28-
queryCh chan chan struct{}
29-
}
30-
31-
// NewTimeLimiter creates a new TimeLimiter. A background goroutine will run until the
32-
// provided context is done. When the amount of time spent on task (the time is determined
33-
// by calls to "Add()") every "window" duration is more then "limit", then calls to
34-
// Wait() will block until the start if the next window period.
35-
func NewTimeLimiter(ctx context.Context, window, limit time.Duration) *TimeLimiter {
36-
l := &TimeLimiter{
37-
ctx: ctx,
38-
window: window,
39-
limit: limit,
40-
addCh: make(chan time.Duration),
41-
queryCh: make(chan chan struct{}),
42-
}
43-
go l.run()
44-
return l
4525
}
4626

47-
func (l *TimeLimiter) run() {
48-
ticker := time.NewTicker(l.window)
49-
done := l.ctx.Done()
50-
var blockedQueries []chan struct{}
51-
for {
52-
select {
53-
case <-done:
54-
//context done. shutting down
55-
for _, ch := range blockedQueries {
56-
close(ch)
57-
}
58-
return
59-
case <-ticker.C:
60-
l.timeSpent = 0
61-
for _, ch := range blockedQueries {
62-
close(ch)
63-
}
64-
blockedQueries = blockedQueries[:0]
65-
case d := <-l.addCh:
66-
l.timeSpent += d
67-
case respCh := <-l.queryCh:
68-
if l.timeSpent < l.limit {
69-
close(respCh)
70-
} else {
71-
// rate limit exceeded. On the next tick respCh will be closed
72-
// notifying the caller that they can continue.
73-
blockedQueries = append(blockedQueries, respCh)
74-
}
75-
}
27+
// NewTimeLimiter creates a new TimeLimiter.
28+
func NewTimeLimiter(window, limit time.Duration, now time.Time) *TimeLimiter {
29+
l := TimeLimiter{
30+
since: now,
31+
next: now.Add(window),
32+
window: window,
33+
limit: limit,
7634
}
35+
spew.Dump(l)
36+
return &l
7737
}
7838

7939
// Add increments the "time spent" counter by "d"
8040
func (l *TimeLimiter) Add(d time.Duration) {
81-
l.addCh <- d
41+
l.add(time.Now(), d)
42+
}
43+
44+
// add increments the "time spent" counter by "d" at a given time
45+
func (l *TimeLimiter) add(now time.Time, d time.Duration) {
46+
if now.After(l.next) {
47+
l.timeSpent = d
48+
l.since = now.Add(-d)
49+
l.next = l.since.Add(l.window)
50+
fmt.Println("added and updated")
51+
spew.Dump(l)
52+
return
53+
}
54+
l.timeSpent += d
8255
}
8356

84-
// Wait returns when we are not rate limited, which may be
85-
// anywhere between immediately or after the window.
57+
// Wait returns when we are not rate limited
58+
// * if we passed the window, we reset everything (this is only safe for callers
59+
// that behave correctly, i.e. that wait the instructed time after each add)
60+
// * if limit is not reached, no sleep is needed
61+
// * if limit has been exceeded, sleep until next period + extra multiple to compensate
62+
// this is perhaps best explained with an example:
63+
// if window is 1s and limit 100ms, but we spent 250ms, then we spent effectively 2.5 seconds worth of work.
64+
// let's say we are 800ms into the 1s window, that means we should sleep 2500-800 = 1.7s
65+
// in order to maximize work while honoring the imposed limit.
66+
// * if limit has been met exactly, sleep until next period (this is a special case of the above)
8667
func (l *TimeLimiter) Wait() {
87-
respCh := make(chan struct{})
88-
l.queryCh <- respCh
68+
time.Sleep(l.wait(time.Now()))
69+
}
70+
71+
// wait returns how long should be slept at a given time. See Wait() for more info
72+
func (l *TimeLimiter) wait(now time.Time) time.Duration {
73+
74+
// if we passed the window, reset and start over
75+
// if clock is adjusted backwards, best we can do is also just reset and start over
76+
if now.After(l.next) || now.Before(l.since) {
77+
l.timeSpent = 0
78+
l.since = now
79+
l.next = now.Add(l.window)
80+
fmt.Println("wait and update")
81+
spew.Dump(l)
82+
return 0
83+
}
84+
if l.timeSpent < l.limit {
85+
return 0
86+
}
8987

90-
// if we have not exceeded our locking quota then respCh will be
91-
// immediately closed. Otherwise it wont be closed until the next tick (duration of "l.window")
92-
// and we will block until then.
93-
<-respCh
88+
// now <= next
89+
// now >= since
90+
// timespent >= limit
91+
excess := l.timeSpent - l.limit
92+
multiplier := l.window / l.limit
93+
timeToPass := excess * multiplier
94+
timePassed := now.Sub(since)
95+
// not sure if this should happen, but let's be safe anyway
96+
if timePassed >= timeToPass {
97+
return 0
98+
}
99+
fmt.Println("wait and now is", now)
100+
return TimeToPass - timePassed
94101
}

idx/memory/time_limit_test.go

+26-52
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,45 @@
11
package memory
22

33
import (
4-
"context"
4+
"fmt"
55
"testing"
66
"time"
77
)
88

9-
func shouldTakeAbout(t *testing.T, fn func(), expDur time.Duration, sloppynessFactor int, info string) {
10-
// on Dieter's laptop:
11-
// takes about <=15 micros for add/wait sequences
12-
// takes about 150micros for a add + blocking wait
13-
// on circleCI, takes 75micros for add/wait sequence
14-
slop := time.Duration(sloppynessFactor) * time.Microsecond
15-
pre := time.Now()
16-
fn()
17-
dur := time.Since(pre)
18-
if dur > expDur+slop || dur < expDur-slop {
19-
t.Fatalf("scenario %s. was supposed to take %s, but took %s", info, expDur, dur)
9+
var now = time.Unix(10, 0)
10+
11+
func shouldTake(t *testing.T, tl *TimeLimiter, workDone, expDur time.Duration, info string) {
12+
13+
// account for work done, as well as moving our clock forward by the same amount
14+
now.Add(workDone)
15+
tl.add(now, workDone)
16+
17+
fmt.Println("now is now", now)
18+
dur := tl.wait(now)
19+
if dur != expDur {
20+
t.Fatalf("scenario %s. expected wait %s, got wait %s", info, expDur, dur)
2021
}
22+
23+
// fake the "sleep" so we're a properly behaving caller
24+
now.Add(dur)
2125
}
2226

2327
func TestTimeLimiter(t *testing.T) {
2428
window := time.Second
2529
limit := 100 * time.Millisecond
2630

27-
ctx, cancel := context.WithCancel(context.Background())
28-
tl := NewTimeLimiter(ctx, window, limit)
31+
tl := NewTimeLimiter(window, limit, now)
2932

3033
// TEST 1 : Start first window by doing work and seeing when it starts blocking
31-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 0 - wait should be 0")
32-
33-
tl.Add(5 * time.Millisecond)
34-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 5ms - wait should be 0")
35-
36-
tl.Add(10 * time.Millisecond)
37-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 15ms - wait should be 0")
38-
39-
tl.Add(80 * time.Millisecond)
40-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 95ms - wait should be 0")
41-
42-
tl.Add(4 * time.Millisecond)
43-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 1: work done: 99ms - wait should be 0")
44-
45-
tl.Add(3 * time.Millisecond)
46-
47-
shouldTakeAbout(t, tl.Wait, time.Second, 500, "window 1: work done: 102ms - almost no time has passed, so wait should be full window")
34+
shouldTake(t, tl, 0, 0, "window 1: work done: 0")
35+
shouldTake(t, tl, 5*time.Millisecond, 0, "window 1: work done: 5ms")
36+
shouldTake(t, tl, 10*time.Millisecond, 0, "window 1: work done: 15ms")
37+
shouldTake(t, tl, 80*time.Millisecond, 0, "window 1: work done: 95ms")
38+
shouldTake(t, tl, 4*time.Millisecond, 0, "window 1: work done: 99ms")
39+
shouldTake(t, tl, 3*time.Millisecond, time.Second-102*time.Millisecond, "window 1: work done: 102ms")
4840

4941
// TEST 2 : Now that we waited until a full window, should be able to up to limit work again
50-
tl.Add(50 * time.Millisecond)
51-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 2: work done: 50ms - wait should be 0")
52-
53-
tl.Add(40 * time.Millisecond)
54-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 2: work done: 90ms - wait should be 0")
55-
56-
tl.Add(40 * time.Millisecond)
57-
shouldTakeAbout(t, tl.Wait, time.Second, 500, "window 2: work done: 130ms - wait should be 1s")
58-
59-
// TEST 3 : Now that we waited until a full window, should be able to up to limit work again
60-
// but this time we cancel, so we don't have to wait as long
61-
tl.Add(50 * time.Millisecond)
62-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 3: work done: 50ms - wait should be 0")
63-
64-
tl.Add(40 * time.Millisecond)
65-
shouldTakeAbout(t, tl.Wait, 0, 100, "window 3: work done: 90ms - wait should be 0")
66-
67-
tl.Add(40 * time.Millisecond)
68-
69-
time.AfterFunc(500*time.Millisecond, cancel)
70-
shouldTakeAbout(t, tl.Wait, 500*time.Millisecond, 500, "window 3: work done: 130ms, canceling after 500ms - wait should be 500ms")
42+
shouldTake(t, tl, 50*time.Millisecond, 0, "window 2: work done: 50ms")
43+
shouldTake(t, tl, 40*time.Millisecond, 0, "window 2: work done: 90ms")
44+
shouldTake(t, tl, 40*time.Millisecond, time.Second-130*time.Millisecond, "window 2: work done: 130ms")
7145
}

0 commit comments

Comments
 (0)