Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit d215b83

Browse files
author
woodsaj
committed
rate limit the amount of time pruning can lock the index for.
adds a new configSetting "max-prune-lock-time" that is the amount of time per second that the prune job can lock the index for. The default is 100ms, meaning that the index can only be locked for 10% of the time.
1 parent 5c1401b commit d215b83

File tree

9 files changed

+124
-5
lines changed

9 files changed

+124
-5
lines changed

docker/docker-chaos/metrictank.ini

+2
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,5 @@ tag-support = false
376376
tag-query-workers = 50
377377
# size of regular expression cache in tag query evaluation
378378
match-cache-size = 1000
379+
# maximum duration each second a prune job can lock the index.
380+
max-prune-lock-time = 100ms

docker/docker-cluster/metrictank.ini

+2
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,5 @@ tag-support = false
376376
tag-query-workers = 50
377377
# size of regular expression cache in tag query evaluation
378378
match-cache-size = 1000
379+
# maximum duration each second a prune job can lock the index.
380+
max-prune-lock-time = 100ms

docker/docker-dev-custom-cfg-kafka/metrictank.ini

+2
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,5 @@ tag-support = false
376376
tag-query-workers = 50
377377
# size of regular expression cache in tag query evaluation
378378
match-cache-size = 1000
379+
# maximum duration each second a prune job can lock the index.
380+
max-prune-lock-time = 100ms

docs/config.md

+2
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,8 @@ tag-support = false
441441
tag-query-workers = 50
442442
# size of regular expression cache in tag query evaluation
443443
match-cache-size = 1000
444+
# maximum duration each second a prune job can lock the index.
445+
max-prune-lock-time = 100ms
444446
```
445447

446448
# storage-schemas.conf

idx/memory/memory.go

+24-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package memory
22

33
import (
4+
"context"
45
"flag"
56
"fmt"
67
"regexp"
@@ -47,10 +48,11 @@ var (
4748
// metric idx.metrics_active is the number of currently known metrics in the index
4849
statMetricsActive = stats.NewGauge32("idx.metrics_active")
4950

50-
Enabled bool
51-
matchCacheSize int
52-
TagSupport bool
53-
TagQueryWorkers int // number of workers to spin up when evaluation tag expressions
51+
Enabled bool
52+
matchCacheSize int
53+
maxPruneLockTime = time.Millisecond * 100
54+
TagSupport bool
55+
TagQueryWorkers int // number of workers to spin up when evaluation tag expressions
5456
)
5557

5658
func ConfigSetup() {
@@ -59,6 +61,7 @@ func ConfigSetup() {
5961
memoryIdx.BoolVar(&TagSupport, "tag-support", false, "enables/disables querying based on tags")
6062
memoryIdx.IntVar(&TagQueryWorkers, "tag-query-workers", 50, "number of workers to spin up to evaluate tag queries")
6163
memoryIdx.IntVar(&matchCacheSize, "match-cache-size", 1000, "size of regular expression cache in tag query evaluation")
64+
memoryIdx.DurationVar(&maxPruneLockTime, "max-prune-lock-time", time.Millisecond*100, "Maximum duration each second a prune job can lock the index.")
6265
globalconf.Register("memory-idx", memoryIdx)
6366
}
6467

@@ -1308,13 +1311,24 @@ DEFS:
13081311
}
13091312
m.RUnlock()
13101313

1314+
ctx, cancel := context.WithCancel(context.Background())
1315+
defer cancel()
1316+
1317+
// create a new timeLimiter that allows us to limit the amount of time we spend
1318+
// holding a lock to maxPruneLockTime (default 100ms) every second.
1319+
tl := NewTimeLimiter(ctx, time.Second, maxPruneLockTime)
1320+
13111321
for org, ids := range toPruneTagged {
13121322
if len(ids) == 0 {
13131323
continue
13141324
}
1325+
// make sure we are not locking for too long.
1326+
tl.Wait()
1327+
lockStart := time.Now()
13151328
m.Lock()
13161329
defs := m.deleteTaggedByIdSet(org, ids)
13171330
m.Unlock()
1331+
tl.Add(time.Since(lockStart))
13181332
pruned = append(pruned, defs...)
13191333
}
13201334

@@ -1325,28 +1339,33 @@ ORGS:
13251339
}
13261340

13271341
for path := range paths {
1342+
tl.Wait()
1343+
lockStart := time.Now()
13281344
m.Lock()
13291345
tree, ok := m.tree[org]
13301346

13311347
if !ok {
13321348
m.Unlock()
1349+
tl.Add(time.Since(lockStart))
13331350
continue ORGS
13341351
}
13351352

13361353
n, ok := tree.Items[path]
13371354

13381355
if !ok {
13391356
m.Unlock()
1357+
tl.Add(time.Since(lockStart))
13401358
log.Debug("memory-idx: series %s for orgId:%d was identified for pruning but cannot be found.", path, org)
13411359
continue
13421360
}
13431361

13441362
log.Debug("memory-idx: series %s for orgId:%d is stale. pruning it.", n.Path, org)
13451363
defs := m.delete(org, n, true, false)
13461364
m.Unlock()
1365+
tl.Add(time.Since(lockStart))
13471366
pruned = append(pruned, defs...)
1348-
}
13491367

1368+
}
13501369
}
13511370

13521371
statMetricsActive.Add(-1 * len(pruned))

idx/memory/time_limit.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package memory
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
)
8+
9+
// TimeLimiter provides a means limit the amount of time spent working.TimeLimiter
10+
type TimeLimiter struct {
11+
sync.Mutex
12+
ctx context.Context
13+
lockDuration time.Duration
14+
window time.Duration
15+
limit time.Duration
16+
accountCh chan time.Duration
17+
queryCh chan chan struct{}
18+
}
19+
20+
// NewTimeLimiter creates a new TimeLimiter. A background thread will run until the
21+
// provided context is done. When the amount of time spent on task (the time is determined
22+
// by calls to "Add()") every "window" duration is more then "limit", then calls to
23+
// Wait() will block until the start if the next window period.
24+
func NewTimeLimiter(ctx context.Context, window, limit time.Duration) *TimeLimiter {
25+
l := &TimeLimiter{
26+
ctx: ctx,
27+
window: window,
28+
limit: limit,
29+
accountCh: make(chan time.Duration),
30+
queryCh: make(chan chan struct{}),
31+
}
32+
go l.run()
33+
return l
34+
}
35+
36+
func (l *TimeLimiter) run() {
37+
ticker := time.NewTicker(l.window)
38+
done := l.ctx.Done()
39+
var blockedQueries []chan struct{}
40+
for {
41+
select {
42+
case <-done:
43+
//context done. shutting down
44+
for _, ch := range blockedQueries {
45+
close(ch)
46+
}
47+
blockedQueries = nil
48+
return
49+
case <-ticker.C:
50+
// reset lockDuration to 0
51+
l.lockDuration = time.Duration(0)
52+
for _, ch := range blockedQueries {
53+
close(ch)
54+
}
55+
blockedQueries = nil
56+
case d := <-l.accountCh:
57+
l.lockDuration += d
58+
case respCh := <-l.queryCh:
59+
if l.lockDuration < l.limit {
60+
close(respCh)
61+
} else {
62+
// rate limit exceeded. On the next tick respCh will be closed
63+
// notifying the caller that they can continue.
64+
blockedQueries = append(blockedQueries, respCh)
65+
}
66+
}
67+
}
68+
}
69+
70+
// Add increments the counter of time spent doing something by "d"
71+
func (l *TimeLimiter) Add(d time.Duration) {
72+
l.accountCh <- d
73+
}
74+
75+
// Wait will return immediatly if we are not rate limited, otherwise it will
76+
// block until we are no longer limited. The longest we will block for is
77+
// the size of the defined time window.
78+
func (l *TimeLimiter) Wait() {
79+
respCh := make(chan struct{})
80+
l.queryCh <- respCh
81+
82+
// if we have not exceeded our locking quota then respCh will be
83+
// immediatly closed. Otherwise it wont be closed until the next tick (duration of "l.window")
84+
// and we will block until then.
85+
<-respCh
86+
}

metrictank-sample.ini

+2
Original file line numberDiff line numberDiff line change
@@ -379,3 +379,5 @@ tag-support = false
379379
tag-query-workers = 50
380380
# size of regular expression cache in tag query evaluation
381381
match-cache-size = 1000
382+
# maximum duration each second a prune job can lock the index.
383+
max-prune-lock-time = 100ms

scripts/config/metrictank-docker.ini

+2
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,5 @@ tag-support = false
376376
tag-query-workers = 50
377377
# size of regular expression cache in tag query evaluation
378378
match-cache-size = 1000
379+
# maximum duration each second a prune job can lock the index.
380+
max-prune-lock-time = 100ms

scripts/config/metrictank-package.ini

+2
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,5 @@ tag-support = false
376376
tag-query-workers = 50
377377
# size of regular expression cache in tag query evaluation
378378
match-cache-size = 1000
379+
# maximum duration each second a prune job can lock the index.
380+
max-prune-lock-time = 100ms

0 commit comments

Comments
 (0)