Skip to content

Commit b538f3e

Browse files
committed
bounded percentiles throttler implementation
1 parent 1b50937 commit b538f3e

File tree

5 files changed

+75
-67
lines changed

5 files changed

+75
-67
lines changed

README.MD

+2-2
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ func maxsearch(ctx context.Context, s searcher, topics []string, max uint64) (ur
154154
| buffered | `NewThrottlerBuffered(threshold uint64)` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until until the running quota is available again. |
155155
| priority | `NewThrottlerPriority(threshold uint64, levels uint8)` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until until the running quota is available again.<br> Running quota is not equally distributed between *n* levels of priority defined by the specified levels.<br> Use `WithPriority(ctx context.Context, priority uint8) context.Context` to override context call priority, *1* by default. |
156156
| timed | `NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration)` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates. |
157-
| latency | `NewThrottlerLatency(threshold time.Duration, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.<br> If retention is set then throttler state will be reseted after retention duration. |
158-
| percentile | `NewThrottlerPercentile(threshold time.Duration, percentile float64, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.<br> If retention is set then throttler state will be reseted after retention duration. |
157+
| latency | `NewThrottlerLatency(threshold time.Duration, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.<br> If retention is set then throttler state will be reseted after retention duration.<br> Use `WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. |
158+
| percentile | `NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.<br> Percentile values are kept in bounded buffer with capacity *c* defined by the specified capacity. <br> If retention is set then throttler state will be reseted after retention duration.<br> Use `WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. |
159159
| monitor | `NewThrottlerMonitor(mnt Monitor, threshold Stats)` | Throttles call if any of the stats returned by the provided monitor exceeds any of the stats defined by the specified threshold or if any internal error occurred. |
160160
| metric | `NewThrottlerMetric(mtc Metric)` | Throttles call if metric defined by the specified metric is riched or if any internal error occurred. |
161161
| enqueuer | `NewThrottlerEnqueue(enq Enqueuer)` | Always enqueues message to the specified queue throttles only if any internal error occurred.<br> Use `WithData(ctx context.Context, data interface{}) context.Context` to specify context data for enqueued message and `WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context` to specify context data marshaler. |

heap.go

-55
This file was deleted.

percentiles.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package gohalt
2+
3+
import (
4+
"math"
5+
"sort"
6+
"sync"
7+
)
8+
9+
type percentiles struct {
10+
buf []uint64
11+
cap uint8
12+
lock sync.Mutex
13+
}
14+
15+
func (p *percentiles) Len() int {
16+
p.lock.Lock()
17+
defer p.lock.Unlock()
18+
return len(p.buf)
19+
}
20+
21+
func (p *percentiles) Push(dim uint64) {
22+
p.lock.Lock()
23+
defer p.lock.Unlock()
24+
if len(p.buf) >= int(p.cap) {
25+
p.buf = p.buf[1:]
26+
}
27+
p.buf = append(p.buf, dim)
28+
}
29+
30+
func (p *percentiles) At(pval float64) uint64 {
31+
p.lock.Lock()
32+
defer p.lock.Unlock()
33+
buf := make([]uint64, len(p.buf))
34+
_ = copy(buf, p.buf)
35+
sort.Slice(buf, func(i, j int) bool {
36+
return buf[i] < buf[j]
37+
})
38+
at := int(math.Round(float64(len(buf)-1) * pval))
39+
return buf[at]
40+
}
41+
42+
func (p *percentiles) Prune() {
43+
p.lock.Lock()
44+
defer p.lock.Unlock()
45+
p.buf = make([]uint64, 0, p.cap)
46+
}

throttlers.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package gohalt
22

33
import (
4-
"container/heap"
54
"context"
65
"errors"
76
"fmt"
@@ -392,18 +391,20 @@ func (thr *tlatency) Release(ctx context.Context) error {
392391

393392
type tpercentile struct {
394393
reset Runnable
395-
latencies *blatheap
394+
latencies *percentiles
396395
threshold time.Duration
397396
percentile float64
398397
retention time.Duration
399398
}
400399

401-
func NewThrottlerPercentile(threshold time.Duration, percentile float64, retention time.Duration) tpercentile {
400+
func NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration) tpercentile {
402401
percentile = math.Abs(percentile)
403402
if percentile > 1.0 {
404403
percentile = 1.0
405404
}
406-
thr := tpercentile{latencies: &blatheap{}, threshold: threshold, percentile: percentile, retention: retention}
405+
thr := tpercentile{threshold: threshold, percentile: percentile, retention: retention}
406+
thr.latencies = &percentiles{cap: capacity}
407+
thr.latencies.Prune()
407408
thr.reset = locked(
408409
delayed(thr.retention, func(context.Context) error {
409410
thr.latencies.Prune()
@@ -414,9 +415,8 @@ func NewThrottlerPercentile(threshold time.Duration, percentile float64, retenti
414415
}
415416

416417
func (thr tpercentile) Acquire(ctx context.Context) error {
417-
if length := thr.latencies.Len(); length > 0 {
418-
at := int(math.Round(float64(length-1) * thr.percentile))
419-
if latency := thr.latencies.At(at); latency >= uint64(thr.threshold) {
418+
if thr.latencies.Len() > 0 {
419+
if latency := thr.latencies.At(thr.percentile); latency >= uint64(thr.threshold) {
420420
gorun(ctx, thr.reset)
421421
return errors.New("throttler has exceed latency threshold")
422422
}
@@ -428,7 +428,7 @@ func (thr tpercentile) Release(ctx context.Context) error {
428428
nowTs := time.Now().UTC().UnixNano()
429429
ctxTs := ctxTimestamp(ctx).UnixNano()
430430
latency := uint64(nowTs - ctxTs)
431-
heap.Push(thr.latencies, latency)
431+
thr.latencies.Push(latency)
432432
return nil
433433
}
434434

throttlers_test.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ func TestThrottlers(t *testing.T) {
434434
},
435435
"Throttler percentile should throttle on latency above threshold": {
436436
tms: 5,
437-
thr: NewThrottlerPercentile(ms3_0, 0.5, ms7_0),
437+
thr: NewThrottlerPercentile(ms3_0, 10, 0.5, ms7_0),
438438
tss: []time.Duration{
439439
ms0_0,
440440
-ms5_0,
@@ -451,7 +451,7 @@ func TestThrottlers(t *testing.T) {
451451
},
452452
"Throttler percentile should throttle on latency above threshold after retention": {
453453
tms: 5,
454-
thr: NewThrottlerPercentile(ms3_0, 1.5, ms5_0),
454+
thr: NewThrottlerPercentile(ms3_0, 10, 1.5, ms5_0),
455455
tss: []time.Duration{
456456
ms0_0,
457457
-ms5_0,
@@ -473,6 +473,23 @@ func TestThrottlers(t *testing.T) {
473473
nil,
474474
},
475475
},
476+
"Throttler percentile should throttle on latency above threshold with capacity": {
477+
tms: 5,
478+
thr: NewThrottlerPercentile(ms3_0, 1, 0.5, ms7_0),
479+
tss: []time.Duration{
480+
ms0_0,
481+
-ms5_0,
482+
-ms1_0,
483+
-ms5_0,
484+
},
485+
errs: []error{
486+
nil,
487+
nil,
488+
errors.New("throttler has exceed latency threshold"),
489+
nil,
490+
errors.New("throttler has exceed latency threshold"),
491+
},
492+
},
476493
"Throttler adaptive should throttle on throttling adoptee": {
477494
tms: 3,
478495
thr: NewThrottlerAdaptive(

0 commit comments

Comments
 (0)